[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330987#comment-17330987 ]
Guozhang Wang commented on KAFKA-10493: --------------------------------------- While thinking about https://issues.apache.org/jira/browse/KAFKA-12693, one idea is that if we restore based on timestamps instead of offsets (of course, it is not 100 percent safe in practice, since timestamps may be inaccurate as well conceptually, but it should be better than offsets), then even if there are zombies writing to changelogs, we would be much less vulnerable to such scenarios. Following that thought, and that in the long run we would go to: 1) version tables based on timestamps during normal state store materialization as well as restoration, so that we do not depend on offset ordering to update a single snapshot. 2) timestamp based compaction (KIP-280) with compaction horizon (i.e. we could choose to not compact old values which are still within the maintained old version period). Then, as of now, when we are still "keeping the latest version", to align with future, we should 1) depend on timestamps not offsets on materialization and restoration, 2) complete KIP-280 to do compaction based on timestamps. And then for 3) source changelog topics, KS would require / assume it is already configured with the right compaction policy, while on the caller level (e.g. KSQL), we would duplicate that data if necessary, e.g. if the source topic is external and won't guarantee the right configs are set. > KTable out-of-order updates are not being ignored > ------------------------------------------------- > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: Pedro Gontijo > Assignee: Matthias J. Sax > Priority: Blocker > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)