Elastic Cloud Kafka Sink Setup and Usage

Daniel

February 26, 2021

"Elastic cloud kafka sink setup and usage"

Recently we adopted usage of Elastic Cloud, making use of Elasticsearch for querying tag data. We needed to find a solution to ingest data into Elasticsearch from our existing systems. Currently we use Kafka, hosted in Confluent Cloud in our services to stream data using its producer and consumer functionality. We found that Confluent Cloud has a Elasticsearch Sink Connector that we could use to sink data directly from a Kafka topic into an index or data stream without needing to call Elastics REST APIs directly. This is how we set it up and any considerations we had.

Mappings

The first thing to consider is the Elastic mappings. The Elasticsearch Sink Connector can use dynamic mappings on data ingest, however in the case of data such as maps where there can be an indeterminate amount of keys, you might consider creating your own mapping so to avoid a "mapping explosion" later down the line.

Another important point regarding map data is the configuration option "Compact map entries". This defines how map data is ingested and is useful for nested types and avoiding the mapping explosion we mentioned above. Suppose we have this map:

If set the value for "Compact map entries" to true, the map will be ingested as is and each key may get its own mapping in Elastic.

If we set the value of this config to false the map entries are written as nested documents:

The above is useful as you do not need to transform the map yourself before sending to a Kafka stream.

Configuration

Now assuming we have our target data in a Kafka stream lets configure the connector:

Connection

  • Topics

          - This is the list of Kafka topics to sink into elastic. An important note is that the name of the Elastic index or data stream must be the same as the Kafka topic name.

  • Input message format

           - The format of the message e.g JSON, AVRO. This has no affect on Elastic itself but is needed for the connector to read the data from the Kafka stream.

  • Kafka API Key & Kafka API Secret

          - This allows the connector to read from the stream. These can be auto generated in the UI.

  • Connection URI

           - This is the connection URI of Elasticsearch, not Kibana or the cluster management page.

  • Connection User & Password

          - The user info for the connector to authenticate with Elastic, ensure the User has sufficient permissions to write to the index or data stream as well as create indexes etc depending on your use case.

Data conversion

  • Type name

         - Types were deprecated in Elasticsearch 6.0.0 so we set this value as "_doc". If you use an older version of Elasticsearch and use types you can set this value to something else.

  • Ignore Key

         - If this is set to "false" the Elasticsearch document ID will be the record key else it will use a mix of topic+offset+partition for the document id. This is purely preference to if you have a record key you want to use as the document id.

  • Schema Ignore

          - If this is set to true, mappings will be inferred using Elastics dynamic mappings. As usually we set our own mappings before hand, this can be set to false.

  • Compact map entries

         - As mentioned above this can transform maps into nested maps - '{"key": "host", "value": "int001"}'. This option depends on your Elastic mappings and data structure. As we use nested types, i'll set this option to false so it transforms the data for me.

Error handling

  • Behaviour on null values

          - Controls what happens with Kafka records with null keys and values such as tombstone records. Usually we do not care about such records and they can be ignored. There is also the option to fail the connector if they are needed or to delete such records completely.

  • Behaviour on malformed documents

         - How to handle documents Elasticsearch rejects. This can be due to mapping issues, illegal fields, etc. You can fail the connector or ignore the documents. Again this depends on how critical it is that every document gets ingested.

  • Drop invalid messages

        - If a Kafka record cannot be converted to an Elastic document, should it be dropped?

Connection Details

The rest of the connection details are about batching options, and compression to reduce the amount of requests to Elastic. These all depends on your flow of data but to start can be left as the default. The final option is:

  • Auto create indices on start

         - Should the Elastic indexes be created on connector start if not present? This can be set to false if we already have created our index with our mappings.

With that, we can save the Elasticsearch Sink Connector configuration and we should see data being sent from Confluent Kafka to Elasticsearch.

Conclusion

We have talked about how to set up the Elastic sink for the Confluent Cloud platform to the Elastic Cloud platform. With the above settings and considerations, data should be able to be sent easily from Kafka to Elasticsearch without having to use the Elastic REST APIs.

Daniel

Daniel

I am a Software Engineer working as part of the nerd.vision team, mainly working on the backend systems and agents. When I'm not squashing bugs, I enjoy travelling, gaming and experiencing new foods and cultures.