[ https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-6614. ------------------------------------ Fix Version/s: 2.5.0 Resolution: Fixed > kafka-streams to configure internal topics message.timestamp.type=CreateTime > ---------------------------------------------------------------------------- > > Key: KAFKA-6614 > URL: https://issues.apache.org/jira/browse/KAFKA-6614 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Dmitry Vsekhvalnov > Assignee: Sophie Blee-Goldman > Priority: Minor > Labels: newbie > Fix For: 2.5.0 > > > After fixing KAFKA-4785 all internal topics using built-in > *RecordMetadataTimestampExtractor* to read timestamps. > Which doesn't seem to work correctly out of box with kafka brokers configured > with *log.message.timestamp.type=LogAppendTime* when using custom message > timestamp extractor. > Example use-case windowed grouping + aggregation on late data: > {code:java} > KTable<Windowed<Tuple>, Long> summaries = in > .groupBy((key, value) -> ......) > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > .count();{code} > when processing late events: > # custom timestamp extractor will pick up timestamp in the past from message > (let's say hour ago) > # re-partition topic during grouping phase will be written back to kafka > using timestamp from (1) > # kafka broker will ignore provided timestamp in (2) to favor ingestion time > # streams lib will read re-partitioned topic back with > RecordMetadataTimestampExtractor > # and will get ingestion timestamp (3), which usually close to "now" > # window start/end will be incorrectly set based on "now" instead of > original timestamp from payload > Understand there are ways to configure per-topic timestamp type in kafka > brokers to solve this, but it will be really nice if kafka-streams library > can take care of it itself. > To follow "least-surprise" principle. If library relies on timestamp.type > for topic it manages it should enforce it. > CC [~guozhang] based on user group email discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)