Kafka Connector

Last modified 13 Apr 2021 17:52 +02:00

Identity connector for a special integration pattern using Kafka.

Functionalityexperimental
Development statusdormant (not developed actively, but still somehow maintained)
Support statusunknown
OriginEvolveum
Support provided byEvolveum
Target systemsKafka server
Source codehttps://github.com/Evolveum/connector-kafka

Kafka connector was implemented for Kafka server with formatted data by avro scheme from Schema Registry server. Connector was tested with midPoint 4.1. Connector can work as Consumer (only read data from topic), Producer (only write data to topic) or both (write data to one topic and read data from another topic).

Capabilities and Features

Provisioning

YES

Live Synchronization

YES (connector unsupported typical read operation,
however read identity by Live Synchronization)

Password

YES

Activation

YES

Paging support

NO

Scripting

NO

Versions

ONLY SNAPSHOT

Licensing

The connector itself is available under the terms of Apache License 2.0. To our best knowledge there is no extra license needed to use this connector. (Additional licensing terms and conditions may apply with services on which the connector is used).

Known limitations

During creating of this connector were detecting some limitations:

  • Unsupported typical read operation, read identities by Live Synchronization

  • Support createOp, updateDeltaOp and DeleteOp, however for updateDeltaOp need whole object, so all changed and unchanged attributes

  • Avro schema have more types, however midPoint support only primitive types(boolean, double, bytes, float, int, long and string), array of primitive type, record and union of two types, where one is null and second is array or primitive type. So midPoint unsupported enum, map and fixed

Schema

Schema for connector is generated from avro schema. When the connector is only Consumer avro schema will get from Schema Registry server. But when the connector is Producer, then the schema will get from file and will push to Schema Registry server. If some attribute in avro schema have default value is optional, failing which it is required. For example :

Example of avro schema
{
  "namespace": "com.evolveum.test",
  "type": "record",
  "name": "test_schema",
  "fields": [
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "full_name",
      "type": ["string", "null"],
      "default": null
    },
    {
      "name": "favorite_number",
      "type": ["int", "null"],
      "default": null
    },
    {
      "name": "favorite_color_array",
      "type": [{
                      "type": "array",
                      "items": "string"
              }, "null" ],
      "default": null
    },
    {
      "name": "address",
      "type": {
            "name": "address_insade",
                "type": "record",
              "fields": [
          {
            "name": "street",
                   "type": ["string", "null"],
                    "default": null
          },
          {
                  "name": "number",
                      "type": ["int", "null"],
                      "default": null
              }
            ]
          }
    }
  ]
}

Certificate Renewal

This connector support possibility automatically renewal certificate and primary key for communication with Kafka server and Schema Registry server.

Configuration

Schema Registry

Name Description Required Type

schemaRegistryUrl

URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1

true

String

pathToMorePropertiesForSchemaRegistry

Path to file with next configuration properties for Schema Registry client

false

String

schemaRegistrySslProtocol

SSL protocol for schema registry

false

String

Certificate Renewal

Name Description Required Type

ssoUrlRenewal

Url of SSO service for Certificate renewal service

false

String

serviceUrlRenewal

Url for Certificate renewal service

false

String

usernameRenewal

Username for authentication to SSO service

false

String

passwordRenewal

Password for authentication to SSO service

false

GuardedString

clientIdRenewal

Client id for authentication to SSO service

false

String

intervalForCertificateRenewal

Interval in minutes, which define how long before expiration of certificate, it will be renewal. It doesn’t have default value, so compare only actual time with expiration time.

false

Integer

sslPrivateKeyEntryAlias

Alias for primary key in keystore.

false

String

sslPrivateKeyEntryPassword

Password for primary key in keystore.

false

GuardedString

sslTrustCertificateAliasPrefix

With this prefix have to start every alias of certificate, which should be renewal. Sufix is number started from 0. For example prefix is 'caroot', so aliases have to be 'caroot0', 'caroot1', 'caroot2'…​ If one number will be miss next will not be processed.

false

String

Common Properties for Consumer and Producer

Name Description Required Type

useOfConnector

Kafka connector can be use as Consumer(CONSUMER), Producer(PRODUCER) or both Consumer and Producer(CONSUMER\_AND\_PRODUCER). Consumer can read data from Kafka server and Producer can write data to Kafka server. So possible value are 'CONSUMER', 'PRODUCER' and 'CONSUMER\_AND\_PRODUCER'.

true

String

uniqueAttribute

Name of unique attribute in avro schema.

true

String

nameAttribute

Name attribute for account in a resource. In most cases, it is equal to unique attribute, but there can be differences.

false

String

passwordAttribute

Password attribute for account in a resource.

false

String

bootstrapServers

Bootstrap servers property is a comma-separated list of host and port pairs that are the addresses of the Kafka brokers.

true

String

nameOfSchema

Name of used avro schema. When this connector is only Consumer this schema will get from Schema Registry server. But when connector is Producer, then schema will get from file and will push to Schema Registry server.

true

String

kafkaSecurityProtocol

Security protocol for Kafka Server.

false

String

sslKeyStoreType

Ssl key store type used for Kafka Server and Schema Registry server.

false

String

sslKeyStorePath

Ssl key store path used for Kafka Server and Schema Registry server.

false

String

sslKeyStorePassword

Ssl key store password used for Kafka Server and Schema Registry server.

false

GuardedString

sslKeyStoreProvider

Ssl key store provider used for Kafka Server and Schema Registry server.

false

String

sslKeyPassword

Ssl key password used for Kafka Server and Schema Registry server.

false

GuardedString

sslKeyManagerFactoryProvider

Ssl key manager factory provider used for Kafka Server and Schema Registry server.

false

String

sslKeyManagerFactoryAlgorithm

Ssl key manager factory algorithm used for Kafka Server and Schema Registry server.

false

String

sslTrustStoreType

Ssl trust store type used for Kafka Server and Schema Registry server.

false

String

sslTrustStorePath

Ssl trust store path used for Kafka Server and Schema Registry server.

false

String

sslTrustStorePassword

Ssl trust store password used for Kafka Server and Schema Registry server.

false

GuardedString

sslTrustStoreProvider

Ssl trust store provider used for Kafka Server and Schema Registry server.

false

String

sslTrustManagerFactoryProvider

Ssl trust manager factory provider used for Kafka Server and Schema Registry server.

false

String

sslTrustManagerFactoryAlgorithm

Ssl trust manager factory algorithm used for Kafka Server and Schema Registry server.

false

String

Consumer

If some from next property is required, so it is required when use connector as Consumer.

Name Description Required Type

consumerNameOfTopic

Name of the topic, from which the connector will read.

true

String

consumerVersionOfSchema

Version of avro schema, which connector use. If connector is Producer this property will be automatically updated.

true

Integer

consumerGroupId

A unique string that identifies the consumer group this consumer belongs to.

true

String

consumerPartitionOfTopic

List partitions of topic, from which the connector will read. List is a comma-separated, for example '1,2,3,5-7'. Default value is 0.

false

String

consumerDurationIfFail

The time, in minutes, spent waiting in poll if data is not available in the buffer. Default value is 2.

false

Integer

consumerMaxRecords

The maximum number of records returned in a single call.

false

Integer

pathToMorePropertiesForConsumer

Path to file with next configuration properties for Consumer.

false

String

Producer

If some from next property is required, so it is required when use connector as Producer.

Name Description Required Type

producerPathToFileContainingSchema

Path to the file, which contains avro schema.

true

String

producerNameOfTopic

Name of the topic, from which the connector will write.

true

String

pathToMorePropertiesForProducer

Path to file with next configuration properties for Producer.

false

String