Starburst Kafka connector#

The Starburst Kafka connector included in Starburst Enterprise platform (SEP) is an extended version of the Kafka connector with configuration and usage identical. It includes the following features:

  • TLS/SSL encryption (1-way SSL)

  • Additional authentication and access control mechanisms:
    • TLS/SSL authentication (2-way SSL)

    • Basic authentication

    • OAuth 2.0

    • OAuth 2.0 token pass-through

    • SCRAM

    • Kerberos

  • Encoder and decoder for protobuf messages

Requirements#

Configuration#

The connector configuration is identical to the configuration for the base Kafka connector.

A minimal configuration uses the connector.name kafka, and adds configuration for nodes and table names as shown in the following snippet:

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

Security for schema registry access#

The connector supports table schema and schema registry usage, and includes a number of security-related features, detailed in the following sections.

TLS/SSL authentication#

Typically your schema registry is secured with TLS/SSL, and therefore accessed securely with the HTTPS protocol. The connector supports the 2-way authentication used by the protocol, if you enable the HTTPS protocol in your catalog properties file:

kafka.confluent-schema-registry.security-protocol=HTTPS

If your TLS certificates on the schema registry and on SEP are signed by a certificate authority, it is recognized as such, and no further configuration is necessary.

If you use a custom certificate, you have to configure the truststore and keystore to use on SEP after adding the relevant certificates to these files. After creating these files, you have to place them on your cluster nodes and configure the relevant properties:

Truststore and keystore properties#

Property name

Description

kafka.confluent-schema-registry.ssl.truststore.location

Location of the truststore file. Absolute path, or relative path to etc.

kafka.confluent-schema-registry.ssl.truststore.password

Password to the truststore file.

kafka.confluent-schema-registry.ssl.truststore.type

The file format of truststore key, JKS or PKCS12.

kafka.confluent-schema-registry.ssl.keystore.location

Location of the keystore file. Absolute path, or relative path to etc.

kafka.confluent-schema-registry.ssl.keystore.password

Password to the keystore file.

kafka.confluent-schema-registry.ssl.keystore.type

The file format of keystore key. JKS or PKCS12.

kafka.confluent-schema-registry.ssl.key.password

Password of the private key stored in the keystore file.

You can use the secrets support to avoid plain text password values in the catalog file.

Basic authentication#

The schema registry can be configured to require users to authenticate using a username and password via the basic HTTP authentication mechanism. The connector supports the Basic authentication used by the schema registry, if you enable the PASSWORD authentication type and relevant properties in your catalog properties file:

kafka.confluent-schema-registry.authentication.type=PASSWORD
kafka.confluent-schema-registry.authentication.username=examplename
kafka.confluent-schema-registry.authentication.password=examplepassword

Security#

The connector includes a number of security-related features, detailed in the following sections.

Password credential pass-through#

The connector supports password credential pass-through. To enable it, edit the catalog properties file to include the authentication type:

kafka.authentication.type=PASSWORD_PASS_THROUGH

For more information about configurations and limitations, see Password credential pass-through.

TLS/SSL encryption#

By default the connector communicates with the Kafka server using the PLAINTEXT protocol, which means sent data is not encrypted. To encrypt the communication between the connector and the server change the kafka-security-protocol configuration property to:

In addition, you can set following optional configuration properties:

Optional SSL encryption configuration properties#

Property name

Description

kafka.ssl.truststore.location

Location of the truststore file

kafka.ssl.truststore.password

Password to the truststore file

kafka.endpoint-identification-algorithm

The endpoint identification algorithm used by SEP to validate the server host name. The default value is HTTPS. SEP verifies that the broker host name matches the host name in the broker’s certificate. To disable server host name verification use disabled.

You can see a full example configuration with SSL encryption in the following snippet:

connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.truststore.location=/etc/secrets/kafka.broker.truststore.jks
kafka.ssl.truststore.password=truststore_password

TLS/SSL authentication#

With TLS/SSL authentication, the connector authenticates with the Kafka server/broker, also called 2-way authentication. Add the following configuration to your catalog file to use TLS/SSL:

kafka.security-protocol=SSL

You must set the following required configuration properties:

Required settings#

Property name

Description

kafka.ssl.keystore.location

Location of the keystore file

kafka.ssl.keystore.password

Password to the keystore file

kafka.ssl.key.password

Password of the private key stored in the keystore file

You can see a full example configuration using the SSL authentication in the following snippet:

connector.name=kafka
...
kafka.security-protocol=SSL
kafka.ssl.keystore.location=/etc/secrets/kafka.broker.keystore.jks
kafka.ssl.keystore.password=keystore_password
kafka.ssl.key.password=private_key_password

SASL authentication#

With SASL authentication, the connector authenticates with the Kafka server using one of the following supported authentication mechanisms:

Authentication mechanisms#

Authentication mechanism name

Corresponding Kafka SASL mechanism

Documentation

PASSWORD

PLAIN

Password authentication

KERBEROS

GSSAPI

Kerberos authentication

OAUTH2

OAUTHBEARER

OAuth 2.0 authentication

DELEGATED_OAUTH2

OAUTHBEARER

OAuth 2.0 token pass-through

SCRAM_SHA_256

SCRAM-SHA-256

SCRAM authentication

SCRAM_SHA_512

SCRAM-SHA-512

SCRAM authentication

SASL authentication can be enabled for both PLAINTEXT and SSL protocols by setting kafka.security-protocol to SASL_PLAINTEXT and SASL_SSL respectively.

Example configuration of the Kerberos authentication over TLS/SSL:

kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS

Note

If the SASL authentication mechanism is enabled, then the SSL client authentication (2-way authentication) is disabled, but the client still verifies the server certificate (1-way authentication).

Password authentication#

The password authentication is simple username and password authentication using the SASL PLAIN authentication mechanism to authenticate.

Password authentication should only be used with SSL encryption enabled to ensure that the password is not sent without encryption.

Add the following configuration to your catalog properties file to use the password authentication:

kafka.security-protocol=SASL_SSL
kafka.authentication.type=PASSWORD

Set the following required configuration properties:

Required settings#

Property name

Description

kafka.authentication.username

User name for Kafka access

kafka.authentication.password

Password for the user

Kerberos authentication#

The Kerberos authentication uses the Kerberos service and the SASL GSSAPI authentication mechanism to authenticate. Add the following configuration to your catalog properties file to use the Kerberos authentication mechanism:

kafka.security-protocol=SASL_SSL
kafka.authentication.mechanism=KERBEROS

Set the following required configuration properties:

Required settings#

Property Name

Description

kafka.authentication.client.principal

Kerberos client principal name

kafka.authentication.client.keytab

Kerberos client keytab location

kafka.authentication.config

Kerberos service file location, typically /etc/krb5.conf

kafka.authentication.service-name

Kerberos principal name of the Kafka service

Example configuration using the Kerberos authentication:

connector.name=kafka
...
kafka.security-protocol=SASL_SSL
kafka.authentication.type=KERBEROS
kafka.authentication.client.principal=kafka/broker1.your.org@YOUR.ORG
kafka.authentication.client.keytab=/etc/secrets/kafka_client.keytab
kafka.authentication.config=/etc/krb5.conf
kafka.authentication.service-name=kafka

OAuth 2.0 authentication#

The OAuth 2.0 authentication uses an access token obtained from an OAuth 2.0 compliant authorization server and SASL OAUTHBEARER authentication mechanism to authenticate the Kafka connector. Only the client credentials flow is currently supported.

Add the following configuration to your catalog properties file to use the OAuth 2.0 authentication:

kafka.security-protocol=SASL_SSL
kafka.authentication.type=OAUTH2

Set the following required configuration properties:

Required settings#

Property name

Description

kafka.authentication.oauth2.token-url

The token URL of an OAuth 2.0 compliant authorization server.

kafka.authentication.oauth2.client-id

ID of the Kafka connector OAuth2 client.

kafka.authentication.oauth2.client-secret

Secret for the client.

If the authorization server is using SSL with a self-signed certificate, set the additional properties to use a custom truststore while validating the certificate:

Additional settings#

Property name

Description

kafka.authentication.oauth2.ssl.truststore.path

Location of the SSL truststore file used to verify the OAUTH2 authorization server certificate

kafka.authentication.oauth2.ssl.truststore.password

Password to the truststore file

kafka.authentication.oauth2.ssl.truststore.type

Type of the truststore file, supported values are: JKS and PKCS12

OAuth 2.0 token pass-through#

The Kafka connector supports OAuth 2.0 token pass-through.

Configure this option the same as OAuth 2.0 authentication, except for the settings described in this section.

Set the authentication type in the coordinator’s config properties file:

http-server.authentication.type=DELEGATED_OAUTH2

Additionally enable OAUTH2_PASSTHROUGH in the catalog properties file using the Kafka connector:

kafka.authentication.type=OAUTH2_PASSTHROUGH

In addition, the SASL mechanism must be enabled with kafka.security-protocol=SASL_SSL or kafka.security-protocol=SASL_PLAINTEXT as described in the previous section.

SCRAM authentication#

Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512. All examples below use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.

Add the following configuration to your catalog properties file to use the SCRAM authentication:

kafka.security-protocol=SASL_SSL
kafka.authentication.type=SCRAM_SHA_256

Set the following required configuration properties:

Required settings#

Property name

Description

kafka.authentication.username

The user name.

kafka.authentication.password

The password.

Protobuf encoder#

The Protobuf encoder serializes rows to DynamicMessages as defined by the proto file.

Note

The Protobuf schema is encoded with the table column values in each Kafka message

The dataSchema must be defined in the table definition file to use the Protobuf encoder. It points to the location of the proto file for the key or message.

Proto files can be retrieved via HTTP or HTTPS from a remote server with the following syntax:

"dataSchema": "http://example.org/schema/schema.proto"

Local files need to be available on all nodes and use an absolute path in the syntax, for example:

"dataSchema": "/usr/local/schema/schema.proto"

The following field attributes are supported:

  • name - Name of the column in the SEP table.

  • type - SEP type of column.

  • mapping - slash-separated list of field names to select a field from the Protobuf schema. If the field specified in mapping does not exist in the original Avro schema, then a write operation fails.

The following table lists supported SEP types, which can be used in type for the equivalent Avro field type.

SEP to Protobuf type mapping for encoding#

SEP type

Allowed Protobuf types

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Protobuf type with repeated field

MAP

Map

TIMESTAMP

Timestamp, predefined in proto file

Example Protobuf field definition in a table definition file for a Kafka message:

{
  "tableName": "your-table-name",
  "schemaName": "your-schema-name",
  "topicName": "your-topic-name",
  "key": { "..." },
  "message":
  {
    "dataFormat": "protobuf",
    "dataSchema": "/message_schema.proto",
    "fields":
    [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "field1"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "field2"
      },
      {
        "name": "field3",
        "type": "BOOLEAN",
        "mapping": "field3"
      }
    ]
  }
}

Example Protobuf schema definition for the preceding table definition:

syntax = "proto3";

message schema {
  uint64 field1 = 1 ;
  string field2 = 2;
  bool field3 = 3;
}

Example insert query for the preceding table definition:

INSERT INTO example_protobuf_table (field1, field2, field3)
  VALUES (123456789, 'example text', FALSE);

Protobuf decoder#

The Protobuf decoder converts the bytes representing a message or key in Protobuf formatted message based on a schema.

For key/message, using the protobuf decoder, the dataSchema must be defined. It points to the location of a valid proto file of the message which needs to be decoded. This location can be a remote web server, dataSchema: 'http://example.org/schema/schema.proto', or local file, dataSchema: '/usr/local/schema/schema.proto'. The decoder fails if the location is not accessible from the coordinator.

For fields, the following attributes are supported:

  • name - Name of the column in the SEP table.

  • type - SEP type of column.

  • mapping - slash-separated list of field names to select a field from the Avro schema. If field specified in mapping does not exist in the original proto file then a read operation returns NULL.

Table below lists supported SEP types which can be used in type for the equivalent Proto field type/s.

SEP to Protobuf type mapping for decoding#

SEP type

Allowed Protobuf types

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Protobuf data type with repeated field

MAP

map

TIMESTAMP

Timestamp (predefined proto file)

Protobuf schema evolution#

The Protobuf decoder supports the schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Protobuf serialized data created with an older schema. Any change in the Proto file must also be reflected in the topic definition file.

The schema evolution behavior is as follows:

  • Column added in new schema: Data created with an older schema produces a default value, when the table is using the new schema.

  • Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.

  • Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.

  • Changing type of column in the new schema: If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.

Protobuf limitations#

  • Protobuf specific types like any, oneof are not supported.

  • Protobuf Timestamp has a nanosecond precision but SEP supports decoding/ encoding at microsecond precision.