[ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231972#comment-17231972 ]
fml2 edited comment on KAFKA-10722 at 11/14/20, 10:29 AM: ---------------------------------------------------------- Hello, thank you [~mjsax] for the quick response! This perfectly makes sense. Could you please explain why the aggregate operation requires a timestamped store? The operation is not windowed if I understand correctly. It it was windowed (by time), then I'd understand it. Could you please explain? Why is the timestamp needed? And one more thing (proposal): if such usage is discouraged, wouldn't it make sense to log some warning that such usage is not good anymore? Or make the API deprecated so that IDEs warn the developers. Because, as of now, it mesleads to wrong usage. was (Author: fml2): Hello, thank you for the quick response! This perfectly makes sense. Could you please explain why the aggregate operation requires a timestamped store? The operation is not windowed if I understand correctly. It it was windowed (by time), then I'd understand it. Could you please explain? Why is the timestamp needed? And one more thing (proposal): if such usage is discouraged, wouldn't it make sense to log some warning that such usage is not good anymore? Or make the API deprecated so that IDEs warn the developers. Because, as of now, it mesleads to wrong usage. > Timestamped store is used even if not desired > --------------------------------------------- > > Key: KAFKA-10722 > URL: https://issues.apache.org/jira/browse/KAFKA-10722 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1, 2.6.0 > Reporter: fml2 > Priority: Major > > I have a stream which I then group and aggregate (this results in a KTable). > When aggregating, I explicitly tell to materialize the result table using a > usual (not timestamped) store. > After that, the KTable is filtered and streamed. This stream is processed by > a processor that accesses the store. > The problem/bug is that even if I tell to use a non-timestamped store, a > timestamped one is used, which leads to a ClassCastException in the processor > (it iterates over the store and expects the items to be of type "KeyValue" > but they are of type "ValueAndTimestamp"). > Here is the code (schematically). > First, I define the topology: > {code:java} > KTable table = ...aggregate( > initializer, // initializer for the KTable row > aggregator, // aggregator > Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- > Non-Timestamped! > .withKeySerde(...).withValueSerde(...)); > table.toStream().process(theProcessor); > {code} > In the class for the processor: > {code:java} > public void init(ProcessorContext context) { > var store = context.getStateStore("MyStore"); // Returns a > TimestampedKeyValueStore! > } > {code} > A timestamped store is returned even if I explicitly told to use a > non-timestamped one! > > I tried to find the cause for this behaviour and think that I've found it. It > lies in this line: > [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] > There, TimestampedKeyValueStoreMaterializer is used regardless of whether > materialization supplier is a timestamped one or not. > I think this is a bug. > -- This message was sent by Atlassian Jira (v8.3.4#803005)