Starburst Kafka connector#
The Starburst Kafka connector included in Starburst Enterprise platform (SEP) is an extended version of the Kafka connector with configuration and usage identical. It includes the following features:
TLS/SSL encryption (1-way SSL)
- Additional authentication and access control mechanisms:
TLS/SSL authentication (2-way SSL)
Basic authentication
OAuth 2.0
OAuth 2.0 token pass-through
SCRAM
Kerberos
Encoder and decoder for protobuf messages
Requirements#
Fulfill the Kafka connector requirements.
Additional features of the connector require a valid Starburst Enterprise license, unless otherwise noted.
Configuration#
The connector configuration is identical to the configuration for the base Kafka connector.
A minimal configuration uses the connector.name kafka
, and adds
configuration for nodes and table names as shown in the following snippet:
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
Security for schema registry access#
The connector supports table schema and schema registry usage, and includes a number of security-related features, detailed in the following sections.
TLS/SSL authentication#
Typically your schema registry is secured with TLS/SSL, and therefore accessed securely with the HTTPS protocol. The connector supports the 2-way authentication used by the protocol, if you enable the HTTPS protocol in your catalog properties file:
kafka.confluent-schema-registry.security-protocol=HTTPS
If your TLS certificates on the schema registry and on SEP are signed by a certificate authority, it is recognized as such, and no further configuration is necessary.
If you use a custom certificate, you have to configure the truststore and keystore to use on SEP after adding the relevant certificates to these files. After creating these files, you have to place them on your cluster nodes and configure the relevant properties:
Property name |
Description |
---|---|
|
Location of the truststore file. Absolute path, or relative path to
|
|
Password to the truststore file. |
|
The file format of truststore key, |
|
Location of the keystore file. Absolute path, or relative path to
|
|
Password to the keystore file. |
|
The file format of keystore key. |
|
Password of the private key stored in the keystore file. |
You can use the secrets support to avoid plain text password values in the catalog file.
Basic authentication#
The schema registry can be configured to require users to authenticate using a
username and password via the basic HTTP authentication mechanism. The connector
supports the Basic authentication used by the schema registry, if you enable
the PASSWORD
authentication type and relevant properties in your catalog
properties file:
kafka.confluent-schema-registry.authentication.type=PASSWORD
kafka.confluent-schema-registry.authentication.username=examplename
kafka.confluent-schema-registry.authentication.password=examplepassword
Security#
The connector includes a number of security-related features, detailed in the following sections.
Password credential pass-through#
The connector supports password credential pass-through. To enable it, edit the catalog properties file to include the authentication type:
kafka.authentication.type=PASSWORD_PASS_THROUGH
For more information about configurations and limitations, see Password credential pass-through.
TLS/SSL encryption#
By default the connector communicates with the Kafka server using the
PLAINTEXT
protocol, which means sent data is not encrypted. To
encrypt the communication between the connector and the server change the
kafka-security-protocol
configuration property to:
SSL
- in order to connect to the server without any authentication or when using SSL authentication (2-way SSL),SASL_SSL
- when connecting to the server using SASL authentication.
In addition, you can set following optional configuration properties:
Property name |
Description |
---|---|
|
Location of the truststore file |
|
Password to the truststore file |
|
The endpoint identification algorithm used by SEP to validate the
server host name. The default value is |
You can see a full example configuration with SSL
encryption in the
following snippet:
connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.truststore.location=/etc/secrets/kafka.broker.truststore.jks
kafka.ssl.truststore.password=truststore_password
TLS/SSL authentication#
With TLS/SSL authentication, the connector authenticates with the Kafka server/broker, also called 2-way authentication. Add the following configuration to your catalog file to use TLS/SSL:
kafka.security-protocol=SSL
You must set the following required configuration properties:
Property name |
Description |
---|---|
|
Location of the keystore file |
|
Password to the keystore file |
|
Password of the private key stored in the keystore file |
You can see a full example configuration using the SSL authentication in the following snippet:
connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.keystore.location=/etc/secrets/kafka.broker.keystore.jks
kafka.ssl.keystore.password=keystore_password
kafka.ssl.key.password=private_key_password
SASL authentication#
With SASL authentication, the connector authenticates with the Kafka server using one of the following supported authentication mechanisms:
Authentication mechanism name |
Corresponding Kafka SASL mechanism |
Documentation |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SASL authentication can be enabled for both PLAINTEXT
and SSL
protocols
by setting kafka.security-protocol
to SASL_PLAINTEXT
and SASL_SSL
respectively.
Example configuration of the Kerberos authentication over TLS/SSL:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
Note
If the SASL authentication mechanism is enabled, then the SSL client authentication (2-way authentication) is disabled, but the client still verifies the server certificate (1-way authentication).
Password authentication#
The password authentication is simple username and password authentication
using the SASL PLAIN
authentication mechanism to authenticate.
Password authentication should only be used with SSL encryption enabled to ensure that the password is not sent without encryption.
Add the following configuration to your catalog properties file to use the password authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=PASSWORD
Set the following required configuration properties:
Property name |
Description |
---|---|
|
User name for Kafka access |
|
Password for the user |
Kerberos authentication#
The Kerberos authentication uses the Kerberos service and the SASL GSSAPI
authentication mechanism to authenticate. Add the following configuration to
your catalog properties file to use the Kerberos authentication mechanism:
kafka.security-protocol=SASL_SSL
kafka.authentication.mechanism=KERBEROS
Set the following required configuration properties:
Property Name |
Description |
---|---|
|
Kerberos client principal name |
|
Kerberos client keytab location |
|
Kerberos service file location, typically |
|
Kerberos principal name of the Kafka service |
Example configuration using the Kerberos authentication:
connector.name=kafka
...
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
kafka.authentication.client.principal=kafka/broker1.your.org@YOUR.ORG
kafka.authentication.client.keytab=/etc/secrets/kafka_client.keytab
kafka.authentication.config=/etc/krb5.conf
kafka.authentication.service-name=kafka
OAuth 2.0 authentication#
The OAuth 2.0 authentication uses an access token obtained from an OAuth 2.0
compliant authorization server and SASL OAUTHBEARER
authentication mechanism
to authenticate the Kafka connector. Only the client credentials flow is
currently supported.
Add the following configuration to your catalog properties file to use the OAuth 2.0 authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=OAUTH2
Set the following required configuration properties:
Property name |
Description |
---|---|
|
The token URL of an OAuth 2.0 compliant authorization server. |
|
ID of the Kafka connector OAuth2 client. |
|
Secret for the client. |
If the authorization server is using SSL with a self-signed certificate, set the additional properties to use a custom truststore while validating the certificate:
Property name |
Description |
---|---|
|
Location of the SSL truststore file used to verify the OAUTH2 authorization server certificate |
|
Password to the truststore file |
|
Type of the truststore file, supported values are: |
OAuth 2.0 token pass-through#
The Kafka connector supports OAuth 2.0 token pass-through.
Configure this option the same as OAuth 2.0 authentication, except for the settings described in this section.
Set the authentication type in the coordinator’s config properties file:
http-server.authentication.type=DELEGATED_OAUTH2
Additionally enable OAUTH2_PASSTHROUGH
in the catalog properties file using
the Kafka connector:
kafka.authentication.type=OAUTH2_PASSTHROUGH
In addition, the SASL mechanism must be enabled with
kafka.security-protocol=SASL_SSL
or
kafka.security-protocol=SASL_PLAINTEXT
as described in the previous section.
SCRAM authentication#
Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512. All examples below use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.
Add the following configuration to your catalog properties file to use the SCRAM authentication:
kafka.security-protocol=SASL_SSL
kafka.authentication.type=SCRAM_SHA_256
Set the following required configuration properties:
Property name |
Description |
---|---|
|
The user name. |
|
The password. |
Protobuf encoder#
The Protobuf encoder serializes rows to DynamicMessages as defined by the proto file.
Note
The Protobuf schema is encoded with the table column values in each Kafka message
The dataSchema
must be defined in the table definition file to use the
Protobuf encoder. It points to the location of the proto file for the key or
message.
Proto files can be retrieved via HTTP or HTTPS from a remote server with the following syntax:
"dataSchema": "http://example.org/schema/schema.proto"
Local files need to be available on all nodes and use an absolute path in the syntax, for example:
"dataSchema": "/usr/local/schema/schema.proto"
The following field attributes are supported:
name
- Name of the column in the SEP table.type
- SEP type of column.mapping
- slash-separated list of field names to select a field from the Protobuf schema. If the field specified inmapping
does not exist in the original Avro schema, then a write operation fails.
The following table lists supported SEP types, which can be used in type
for the equivalent Avro field type.
SEP type |
Allowed Protobuf types |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf type with |
|
|
|
|
Example Protobuf field definition in a table definition file for a Kafka message:
{
"tableName": "your-table-name",
"schemaName": "your-schema-name",
"topicName": "your-topic-name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
Example Protobuf schema definition for the preceding table definition:
syntax = "proto3";
message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}
Example insert query for the preceding table definition:
INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);
Protobuf decoder#
The Protobuf decoder converts the bytes representing a message or key in Protobuf formatted message based on a schema.
For key/message, using the protobuf
decoder, the dataSchema
must be
defined. It points to the location of a valid proto file of the message which
needs to be decoded. This location can be a remote web server, dataSchema:
'http://example.org/schema/schema.proto'
, or local file, dataSchema:
'/usr/local/schema/schema.proto'
. The decoder fails if the location is not
accessible from the coordinator.
For fields, the following attributes are supported:
name
- Name of the column in the SEP table.type
- SEP type of column.mapping
- slash-separated list of field names to select a field from the Avro schema. If field specified inmapping
does not exist in the original proto file then a read operation returns NULL.
Table below lists supported SEP types which can be used in type
for the
equivalent Proto field type/s.
SEP type |
Allowed Protobuf types |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Protobuf data type with |
|
|
|
|
Protobuf schema evolution#
The Protobuf decoder supports the schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Protobuf serialized data created with an older schema. Any change in the Proto file must also be reflected in the topic definition file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value, when the table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.
Protobuf limitations#
Protobuf specific types like
any
,oneof
are not supported.Protobuf Timestamp has a nanosecond precision but SEP supports decoding/ encoding at microsecond precision.