Any updates/comments ?

-Lokesh


On Tue, Sep 7, 2021 at 9:29 AM Lokesh Lingarajan <llingara...@confluent.io>
wrote:

> Hope everyone had a good long weekend. Any updates/comments ?
>
> -Lokesh
>
>
> On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <
> llingara...@confluent.io> wrote:
>
>> Motivation
>>
>> Today we ingest a number of high cardinality metrics into Druid across
>> dimensions. These metrics are rolled up on a per minute basis, and are very
>> useful when looking at metrics on a partition or client basis. Events is
>> another class of data that provides useful information about a particular
>> incident/scenario inside a Kafka cluster. Events themselves are carried
>> inside the kafka payload, but nonetheless there is some very useful
>> metadata that is carried in kafka headers that can serve as a useful
>> dimension for aggregation and in turn bringing better insights.
>>
>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>> support for Kafka headers in InputFormats.
>>
>> We still need an input format to parse out the headers and translate
>> those into relevant columns in Druid. Until that’s implemented, none of the
>> information available in the Kafka message headers would be exposed. So
>> first there is a need to implement an input format that can parse headers
>> in any given format(provided we support the format) like we parse payloads
>> today. Apart from headers there is also some useful information present in
>> the key portion of the kafka record. We also need a way to expose the data
>> present in the key as druid columns. We need a generic way to express at
>> configuration time what attributes from headers, key and payload need to be
>> ingested into druid. We need to keep the design generic enough so that
>> users can specify different parsers for headers, key and payload.
>>
>> Proposal is to design an input format to solve the above by providing
>> wrapper around any existing input formats and merging the data into a
>> single unified Druid row.
>> Proposed changes
>>
>> Let's look at a sample input format from the above discussion
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *"inputFormat":{        "type": "kafka", // New input format type
>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>> this will avoid collisions while merging columns
>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
>> available in case payload does not carry timestamp        "headerFormat":
>> // Header parser specifying that values are of type string        {
>>       "type": "string"        },       "valueFormat": // Value parser from
>> json parsing       {             "type": "json",             "flattenSpec":
>> {                     "useFieldDiscovery": true,
>> "fields": [...]             }        },        "keyFormat": // Key parser
>> also from json parsing         {             "type": "json"         }}*
>>
>> Since we have independent sections for header, key and payload, it will
>> also enable parsing each section with its own parser, eg., headers coming
>> in as string and payload as json.
>>
>> KafkaInputFormat(the new inputFormat class) will be the uber class
>> extending inputFormat interface and will be responsible for creating
>> individual parsers for header, key and payload, blend the data resolving
>> conflicts in columns and generating a single unified InputRow for Druid
>> ingestion.
>>
>> "headerFormat" will allow users to plug in a parser type for the header
>> values and will add the default header prefix as "kafka.header."(can be
>> overridden) for attributes to avoid collision while merging attributes with
>> payload.
>>
>> Kafka payload parser will be responsible for parsing the Value portion of
>> the Kafka record. This is where most of the data will come from and we
>> should be able to plugin existing parsers. One thing to note here is that
>> if batching is performed, then the code should be augmenting header and key
>> values to every record in the batch.
>>
>> Kafka key parser will handle parsing the Key portion of the Kafka record
>> and will ingest the Key with dimension name as "kafka.key".
>> Operational impact, Test plan & Future work
>>
>> Since we had an immediate need to ingest blended data from header and
>> payload, we have implemented the above proposal in a PR - here
>> <https://github.com/apache/druid/pull/11630>
>> -Lokesh Lingarajan
>>
>

Reply via email to