[ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
fml2 updated KAFKA-10722: ------------------------- Description: I have a stream which I then group and aggregate (this results in a KTable). When aggregating, I explicitly tell to materialize the result table using a usual (not timestamped) store. After that, the KTable is filtered and streamed. This stream is processed by a processor that accesses the store. The problem/bug is that even if I tell to use a non-timestamped store, a timestamped one is used, which leads to a ClassCastException in the processor (it iterates over the store and expects the items to be of type "KeyValue" but they are of type "ValueAndTimestamp"). Here is a schema of the code. First, I define the topology: {code:java} KTable table = ...aggregate( initializer, // initializer for the KTable row aggregator, // aggregator Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- Non-Timestamped! .withKeySerde(...).withValueSerde(...)); table.toStream().process(theProcessor); {code} In the class for the processor: {code:java} public void init(ProcessorContext context) { var store = context.getStateStore("MyStore"); // Returns a TimestampedKeyValueStore! } {code} A timestamped store is returned even if I explicitly told to use a non-timestamped one! I tried to find the cause for this behaviour and think that I've found it. It lies in this line: [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] There, TimestampedKeyValueStoreMaterializer is used regardless whether materialization supplier is a timestamped one or not. I think this is a bug. was: I have a stream which I then group and aggregate (this results in a KTable). When aggregating, I explicitly tell to materialize the result table using a usual (not timestamped) store. After that, the KTable is filtered and streamed. This stream is processed by a processor that accesses the store. The problem/bug is that even if I tell to use a non-timestamped store, a timestamped one is used, which leads to a ClassCastException in the processor (it iterates over the store and expects the items to be of type "KeyValue" but htey are of type "ValueAndTimestamp"). Here is a schema of the code. First, I define the topology: {code:java} KTable table = ...aggregate( initializer, // initializer for the KTable row aggregator, // aggregator Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- Non-Timestamped! .withKeySerde(...).withValueSerde(...)); table.toStream().process(theProcessor); {code} In the class for the processor: {code:java} public void init(ProcessorContext context) { var store = context.getStateStore("MyStore"); // Returns a TimestampedKeyValueStore! } {code} A timestamped store is returned even if I explicitly told to use a non-timestamped one! I tried to find the cause for this behaviour and think that I've found it. It lies in this line: [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] There, TimestampedKeyValueStoreMaterializer is used regardless whether materialization supplier is a timestamped one or not. I think this is a bug. > Timestamped store is used even if not desired > --------------------------------------------- > > Key: KAFKA-10722 > URL: https://issues.apache.org/jira/browse/KAFKA-10722 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1, 2.6.0 > Reporter: fml2 > Priority: Major > > I have a stream which I then group and aggregate (this results in a KTable). > When aggregating, I explicitly tell to materialize the result table using a > usual (not timestamped) store. > After that, the KTable is filtered and streamed. This stream is processed by > a processor that accesses the store. > The problem/bug is that even if I tell to use a non-timestamped store, a > timestamped one is used, which leads to a ClassCastException in the processor > (it iterates over the store and expects the items to be of type "KeyValue" > but they are of type "ValueAndTimestamp"). > Here is a schema of the code. > First, I define the topology: > {code:java} > KTable table = ...aggregate( > initializer, // initializer for the KTable row > aggregator, // aggregator > Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- > Non-Timestamped! > .withKeySerde(...).withValueSerde(...)); > table.toStream().process(theProcessor); > {code} > In the class for the processor: > {code:java} > public void init(ProcessorContext context) { > var store = context.getStateStore("MyStore"); // Returns a > TimestampedKeyValueStore! > } > {code} > A timestamped store is returned even if I explicitly told to use a > non-timestamped one! > > I tried to find the cause for this behaviour and think that I've found it. It > lies in this line: > [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241] > There, TimestampedKeyValueStoreMaterializer is used regardless whether > materialization supplier is a timestamped one or not. > I think this is a bug. > -- This message was sent by Atlassian Jira (v8.3.4#803005)