Motivation

Today we ingest a number of high cardinality metrics into Druid across
dimensions. These metrics are rolled up on a per minute basis, and are very
useful when looking at metrics on a partition or client basis. Events is
another class of data that provides useful information about a particular
incident/scenario inside a Kafka cluster. Events themselves are carried
inside the kafka payload, but nonetheless there is some very useful
metadata that is carried in kafka headers that can serve as a useful
dimension for aggregation and in turn bringing better insights.

PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced support
for Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those
into relevant columns in Druid. Until that’s implemented, none of the
information available in the Kafka message headers would be exposed. So
first there is a need to implement an input format that can parse headers
in any given format(provided we support the format) like we parse payloads
today. Apart from headers there is also some useful information present in
the key portion of the kafka record. We also need a way to expose the data
present in the key as druid columns. We need a generic way to express at
configuration time what attributes from headers, key and payload need to be
ingested into druid. We need to keep the design generic enough so that
users can specify different parsers for headers, key and payload.

Proposal is to design an input format to solve the above by providing
wrapper around any existing input formats and merging the data into a
single unified Druid row.
Proposed changes

Let's look at a sample input format from the above discussion






















*"inputFormat":{        "type": "kafka", // New input format type
"headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
this will avoid collisions while merging columns
"recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
available in case payload does not carry timestamp        "headerFormat":
// Header parser specifying that values are of type string        {
      "type": "string"        },       "valueFormat": // Value parser from
json parsing       {             "type": "json",             "flattenSpec":
{                     "useFieldDiscovery": true,
"fields": [...]             }        },        "keyFormat": // Key parser
also from json parsing         {             "type": "json"         }}*

Since we have independent sections for header, key and payload, it will
also enable parsing each section with its own parser, eg., headers coming
in as string and payload as json.

KafkaInputFormat(the new inputFormat class) will be the uber class
extending inputFormat interface and will be responsible for creating
individual parsers for header, key and payload, blend the data resolving
conflicts in columns and generating a single unified InputRow for Druid
ingestion.

"headerFormat" will allow users to plug in a parser type for the header
values and will add the default header prefix as "kafka.header."(can be
overridden) for attributes to avoid collision while merging attributes with
payload.

Kafka payload parser will be responsible for parsing the Value portion of
the Kafka record. This is where most of the data will come from and we
should be able to plugin existing parsers. One thing to note here is that
if batching is performed, then the code should be augmenting header and key
values to every record in the batch.

Kafka key parser will handle parsing the Key portion of the Kafka record
and will ingest the Key with dimension name as "kafka.key".
Operational impact, Test plan & Future work

Since we had an immediate need to ingest blended data from header and
payload, we have implemented the above proposal in a PR - here
<https://github.com/apache/druid/pull/11630>
-Lokesh Lingarajan

Reply via email to