I'm not a coder, but wanted to say that I have heard other druid users ask for this functionality, so I think it would be useful. Thank you!
On Tue, Sep 7, 2021 at 10:09 AM Lokesh Lingarajan <llingara...@confluent.io.invalid> wrote: > Hope everyone had a good long weekend. Any updates/comments ? > > -Lokesh > > > On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan < > llingara...@confluent.io> > wrote: > > > Motivation > > > > Today we ingest a number of high cardinality metrics into Druid across > > dimensions. These metrics are rolled up on a per minute basis, and are > very > > useful when looking at metrics on a partition or client basis. Events is > > another class of data that provides useful information about a particular > > incident/scenario inside a Kafka cluster. Events themselves are carried > > inside the kafka payload, but nonetheless there is some very useful > > metadata that is carried in kafka headers that can serve as a useful > > dimension for aggregation and in turn bringing better insights. > > > > PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced > > support for Kafka headers in InputFormats. > > > > We still need an input format to parse out the headers and translate > those > > into relevant columns in Druid. Until that’s implemented, none of the > > information available in the Kafka message headers would be exposed. So > > first there is a need to implement an input format that can parse headers > > in any given format(provided we support the format) like we parse > payloads > > today. Apart from headers there is also some useful information present > in > > the key portion of the kafka record. We also need a way to expose the > data > > present in the key as druid columns. We need a generic way to express at > > configuration time what attributes from headers, key and payload need to > be > > ingested into druid. We need to keep the design generic enough so that > > users can specify different parsers for headers, key and payload. > > > > Proposal is to design an input format to solve the above by providing > > wrapper around any existing input formats and merging the data into a > > single unified Druid row. > > Proposed changes > > > > Let's look at a sample input format from the above discussion > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *"inputFormat":{ "type": "kafka", // New input format type > > "headerLabelPrefix": "kafka.header.", // Label prefix for header columns, > > this will avoid collisions while merging columns > > "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is > made > > available in case payload does not carry timestamp "headerFormat": > > // Header parser specifying that values are of type string { > > "type": "string" }, "valueFormat": // Value parser > from > > json parsing { "type": "json", > "flattenSpec": > > { "useFieldDiscovery": true, > > "fields": [...] } }, "keyFormat": // Key parser > > also from json parsing { "type": "json" }}* > > > > Since we have independent sections for header, key and payload, it will > > also enable parsing each section with its own parser, eg., headers coming > > in as string and payload as json. > > > > KafkaInputFormat(the new inputFormat class) will be the uber class > > extending inputFormat interface and will be responsible for creating > > individual parsers for header, key and payload, blend the data resolving > > conflicts in columns and generating a single unified InputRow for Druid > > ingestion. > > > > "headerFormat" will allow users to plug in a parser type for the header > > values and will add the default header prefix as "kafka.header."(can be > > overridden) for attributes to avoid collision while merging attributes > with > > payload. > > > > Kafka payload parser will be responsible for parsing the Value portion of > > the Kafka record. This is where most of the data will come from and we > > should be able to plugin existing parsers. One thing to note here is that > > if batching is performed, then the code should be augmenting header and > key > > values to every record in the batch. > > > > Kafka key parser will handle parsing the Key portion of the Kafka record > > and will ingest the Key with dimension name as "kafka.key". > > Operational impact, Test plan & Future work > > > > Since we had an immediate need to ingest blended data from header and > > payload, we have implemented the above proposal in a PR - here > > <https://github.com/apache/druid/pull/11630> > > -Lokesh Lingarajan > > >