Any updates/comments ? -Lokesh
On Tue, Sep 7, 2021 at 9:29 AM Lokesh Lingarajan <llingara...@confluent.io> wrote: > Hope everyone had a good long weekend. Any updates/comments ? > > -Lokesh > > > On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan < > llingara...@confluent.io> wrote: > >> Motivation >> >> Today we ingest a number of high cardinality metrics into Druid across >> dimensions. These metrics are rolled up on a per minute basis, and are very >> useful when looking at metrics on a partition or client basis. Events is >> another class of data that provides useful information about a particular >> incident/scenario inside a Kafka cluster. Events themselves are carried >> inside the kafka payload, but nonetheless there is some very useful >> metadata that is carried in kafka headers that can serve as a useful >> dimension for aggregation and in turn bringing better insights. >> >> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced >> support for Kafka headers in InputFormats. >> >> We still need an input format to parse out the headers and translate >> those into relevant columns in Druid. Until that’s implemented, none of the >> information available in the Kafka message headers would be exposed. So >> first there is a need to implement an input format that can parse headers >> in any given format(provided we support the format) like we parse payloads >> today. Apart from headers there is also some useful information present in >> the key portion of the kafka record. We also need a way to expose the data >> present in the key as druid columns. We need a generic way to express at >> configuration time what attributes from headers, key and payload need to be >> ingested into druid. We need to keep the design generic enough so that >> users can specify different parsers for headers, key and payload. >> >> Proposal is to design an input format to solve the above by providing >> wrapper around any existing input formats and merging the data into a >> single unified Druid row. >> Proposed changes >> >> Let's look at a sample input format from the above discussion >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *"inputFormat":{ "type": "kafka", // New input format type >> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns, >> this will avoid collisions while merging columns >> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made >> available in case payload does not carry timestamp "headerFormat": >> // Header parser specifying that values are of type string { >> "type": "string" }, "valueFormat": // Value parser from >> json parsing { "type": "json", "flattenSpec": >> { "useFieldDiscovery": true, >> "fields": [...] } }, "keyFormat": // Key parser >> also from json parsing { "type": "json" }}* >> >> Since we have independent sections for header, key and payload, it will >> also enable parsing each section with its own parser, eg., headers coming >> in as string and payload as json. >> >> KafkaInputFormat(the new inputFormat class) will be the uber class >> extending inputFormat interface and will be responsible for creating >> individual parsers for header, key and payload, blend the data resolving >> conflicts in columns and generating a single unified InputRow for Druid >> ingestion. >> >> "headerFormat" will allow users to plug in a parser type for the header >> values and will add the default header prefix as "kafka.header."(can be >> overridden) for attributes to avoid collision while merging attributes with >> payload. >> >> Kafka payload parser will be responsible for parsing the Value portion of >> the Kafka record. This is where most of the data will come from and we >> should be able to plugin existing parsers. One thing to note here is that >> if batching is performed, then the code should be augmenting header and key >> values to every record in the batch. >> >> Kafka key parser will handle parsing the Key portion of the Kafka record >> and will ingest the Key with dimension name as "kafka.key". >> Operational impact, Test plan & Future work >> >> Since we had an immediate need to ingest blended data from header and >> payload, we have implemented the above proposal in a PR - here >> <https://github.com/apache/druid/pull/11630> >> -Lokesh Lingarajan >> >