[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fml2 updated KAFKA-10722:
-------------------------
    Description: 
I have a stream which I then group and aggregate (this results in a KTable). 
When aggregating, I explicitly tell to materialize the result table using a 
usual (not timestamped) store.

After that, the KTable is filtered and streamed. This stream is processed by a 
processor that accesses the store.

The problem/bug is that even if I tell to use a non-timestamped store, a 
timestamped one is used, which leads to a ClassCastException in the processor 
(it iterates over the store and expects the items to be of type "KeyValue" but 
they are of type "ValueAndTimestamp").

Here is a schema of the code.

First, I define the topology:
{code:java}
KTable table = ...aggregate(
  initializer, // initializer for the KTable row
  aggregator, // aggregator
  Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
Non-Timestamped!
    .withKeySerde(...).withValueSerde(...));

table.toStream().process(theProcessor);
{code}
In the class for the processor:
{code:java}
public void init(ProcessorContext context) {
   var store = context.getStateStore("MyStore"); // Returns a 
TimestampedKeyValueStore!
}
{code}
A timestamped store is returned even if I explicitly told to use a 
non-timestamped one!

 

I tried to find the cause for this behaviour and think that I've found it. It 
lies in this line: 
[https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]

There, TimestampedKeyValueStoreMaterializer is used regardless whether 
materialization supplier is a timestamped one or not.

I think this is a bug.

 

  was:
I have a stream which I then group and aggregate (this results in a KTable). 
When aggregating, I explicitly tell to materialize the result table using a 
usual (not timestamped) store.

After that, the KTable is filtered and streamed. This stream is processed by a 
processor that accesses the store.

The problem/bug is that even if I tell to use a non-timestamped store, a 
timestamped one is used, which leads to a ClassCastException in the processor 
(it iterates over the store and expects the items to be of type "KeyValue" but 
htey are of type "ValueAndTimestamp").

Here is a schema of the code.

First, I define the topology:
{code:java}
KTable table = ...aggregate(
  initializer, // initializer for the KTable row
  aggregator, // aggregator
  Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
Non-Timestamped!
    .withKeySerde(...).withValueSerde(...));

table.toStream().process(theProcessor);
{code}
In the class for the processor:
{code:java}
public void init(ProcessorContext context) {
   var store = context.getStateStore("MyStore"); // Returns a 
TimestampedKeyValueStore!
}
{code}
A timestamped store is returned even if I explicitly told to use a 
non-timestamped one!

 

I tried to find the cause for this behaviour and think that I've found it. It 
lies in this line: 
[https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]

There, TimestampedKeyValueStoreMaterializer is used regardless whether 
materialization supplier is a timestamped one or not.

I think this is a bug.

 


> Timestamped store is used even if not desired
> ---------------------------------------------
>
>                 Key: KAFKA-10722
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10722
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1, 2.6.0
>            Reporter: fml2
>            Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is a schema of the code.
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
>     .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>    var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to