[ 
https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010087#comment-17010087
 ] 

ASF GitHub Bot commented on KAFKA-6614:
---------------------------------------

mjsax commented on pull request #7889: KAFKA-6614: configure internal topics 
with message.timestamp.type=CreateTime by default
URL: https://github.com/apache/kafka/pull/7889
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-streams to configure internal topics message.timestamp.type=CreateTime
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6614
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6614
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Dmitry Vsekhvalnov
>            Assignee: Meghana Gupta
>            Priority: Minor
>              Labels: newbie
>
> After fixing KAFKA-4785 all internal topics using built-in 
> *RecordMetadataTimestampExtractor* to read timestamps.
> Which doesn't seem to work correctly out of box with kafka brokers configured 
> with *log.message.timestamp.type=LogAppendTime* when using custom message 
> timestamp extractor.
> Example use-case windowed grouping + aggregation on late data:
> {code:java}
> KTable<Windowed<Tuple>, Long> summaries = in
>    .groupBy((key, value) -> ......)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();{code}
> when processing late events:
>  # custom timestamp extractor will pick up timestamp in the past from message 
> (let's say hour ago)
>  # re-partition topic during grouping phase will be written back to kafka 
> using timestamp from (1)
>  # kafka broker will ignore provided timestamp in (2) to favor ingestion time
>  # streams lib will read re-partitioned topic back with 
> RecordMetadataTimestampExtractor
>  # and will get ingestion timestamp (3), which usually close to "now"
>  # window start/end will be incorrectly set based on "now" instead of 
> original timestamp from payload
> Understand there are ways to configure per-topic timestamp type in kafka 
> brokers to solve this, but it will be really nice if kafka-streams library 
> can take care of it itself.
> To follow "least-surprise" principle.  If library relies on timestamp.type 
> for topic it manages it should enforce it.
> CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to