[ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17232993#comment-17232993 ]
Matthias J. Sax edited comment on KAFKA-10722 at 11/16/20, 6:55 PM: -------------------------------------------------------------------- Every record in Kafka Streams has a timestamp, and aggregate() needs to set a timestamp for its output records. It computes the output record timestamps as "max" over all input records. That is why it needs a timestamped key-value store to track the maximum timestamp. Unfortunately, we cannot deprecate the API easily because of Java type erasure... I guess we could log a warn message thought... Feel free to do a PR for it. We could log when we create the `KeyValueToTimestampedKeyValueByteStoreAdapter`. And yes, you always get a timestamped key-value store and you can simplify your code accordingly. (Note thought, that if you provide a non-timestamped store, the timestamp won't really be stored, because the above mentioned adapter will just drop the timestamp before storing the data in the provided store – on read, the adapter will just set `-1`, ie, unknown, as timestamp.) Thus, I would actually recommend to pass in a timestamped key-value store to begin with. I am also open to improve our docs, to point out this issue better. Atm, it seem we only documented in the upgrade guide when the feature was added: https://kafka.apache.org/26/documentation/streams/upgrade-guide#streams_api_changes_230 was (Author: mjsax): Every record in Kafka Streams has a timestamp, and aggregate() needs to set a timestamp for its output records. It computes the output record timestamps as "max" over all input records. That is why it needs a timestamped key-value store to track the maximum timestamp. Unfortunately, we cannot deprecate the API easily because of Java type erasure... I guess we could log a warn message thought... Feel free to do a PR for it. We could log when we create the `KeyValueToTimestampedKeyValueByteStoreAdapter`. And yes, you always get a timestamped key-value store and you can simplify your code accordingly. (Note thought, that if you provide a non-timestamped store, the timestamp won't really be stored, because the above mentioned adapter will just drop the timestamp before storing the data in the provided store – on read, the adapter will just set `-1`, ie, unknown, as timestamp.) Thus, I would actually recommend to pass in a timestamped key-value store to begin with. > Timestamped store is used even if not desired > --------------------------------------------- > > Key: KAFKA-10722 > URL: https://issues.apache.org/jira/browse/KAFKA-10722 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1, 2.6.0 > Reporter: fml2 > Priority: Major > > I have a stream which I then group and aggregate (this results in a KTable). > When aggregating, I explicitly tell to materialize the result table using a > usual (not timestamped) store. > After that, the KTable is filtered and streamed. This stream is processed by > a processor that accesses the store. > The problem/bug is that even if I tell to use a non-timestamped store, a > timestamped one is used, which leads to a ClassCastException in the processor > (it iterates over the store and expects the items to be of type "KeyValue" > but they are of type "ValueAndTimestamp"). > Here is the code (schematically). > First, I define the topology: > {code:java} > KTable table = ...aggregate( > initializer, // initializer for the KTable row > aggregator, // aggregator > Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- > Non-Timestamped! > .withKeySerde(...).withValueSerde(...)); > table.toStream().process(theProcessor); > {code} > In the class for the processor: > {code:java} > public void init(ProcessorContext context) { > var store = context.getStateStore("MyStore"); // Returns a > TimestampedKeyValueStore! > } > {code} > A timestamped store is returned even if I explicitly told to use a > non-timestamped one! > > I tried to find the cause for this behaviour and think that I've found it. It > lies in this line: > [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] > There, TimestampedKeyValueStoreMaterializer is used regardless of whether > materialization supplier is a timestamped one or not. > I think this is a bug. > -- This message was sent by Atlassian Jira (v8.3.4#803005)