[ 
https://issues.apache.org/jira/browse/KAFKA-6614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007178#comment-17007178
 ] 

ASF GitHub Bot commented on KAFKA-6614:
---------------------------------------

ableegoldman commented on pull request #7889: KAFKA-6614: "deprecate" 
LogAppendTime for internal topics and use CreateTime
URL: https://github.com/apache/kafka/pull/7889
 
 
   The idea is to ultimately "deprecate" the LogAppendTime timestamp type for 
internal topics, which will be configured with this if the server default is 
set to LogAppendTime. Since this causes the message time set by Streams to be 
overwritten when sent to changelog/repartition topics it can lead to 
non-deterministic and/or unexpected results.
   
   To avoid breaking changes we just start creating new topics with CreateTime 
explicitly set, and log a warning if we find internal topics already exist but 
use LogAppendTime.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> kafka-streams to configure internal topics message.timestamp.type=CreateTime
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6614
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6614
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Dmitry Vsekhvalnov
>            Assignee: Meghana Gupta
>            Priority: Minor
>              Labels: newbie
>
> After fixing KAFKA-4785 all internal topics using built-in 
> *RecordMetadataTimestampExtractor* to read timestamps.
> Which doesn't seem to work correctly out of box with kafka brokers configured 
> with *log.message.timestamp.type=LogAppendTime* when using custom message 
> timestamp extractor.
> Example use-case windowed grouping + aggregation on late data:
> {code:java}
> KTable<Windowed<Tuple>, Long> summaries = in
>    .groupBy((key, value) -> ......)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();{code}
> when processing late events:
>  # custom timestamp extractor will pick up timestamp in the past from message 
> (let's say hour ago)
>  # re-partition topic during grouping phase will be written back to kafka 
> using timestamp from (1)
>  # kafka broker will ignore provided timestamp in (2) to favor ingestion time
>  # streams lib will read re-partitioned topic back with 
> RecordMetadataTimestampExtractor
>  # and will get ingestion timestamp (3), which usually close to "now"
>  # window start/end will be incorrectly set based on "now" instead of 
> original timestamp from payload
> Understand there are ways to configure per-topic timestamp type in kafka 
> brokers to solve this, but it will be really nice if kafka-streams library 
> can take care of it itself.
> To follow "least-surprise" principle.  If library relies on timestamp.type 
> for topic it manages it should enforce it.
> CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to