[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17232993#comment-17232993
 ] 

Matthias J. Sax edited comment on KAFKA-10722 at 11/16/20, 6:55 PM:
--------------------------------------------------------------------

Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 

I am also open to improve our docs, to point out this issue better. Atm, it 
seem we only documented in the upgrade guide when the feature was added: 
https://kafka.apache.org/26/documentation/streams/upgrade-guide#streams_api_changes_230


was (Author: mjsax):
Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 

> Timestamped store is used even if not desired
> ---------------------------------------------
>
>                 Key: KAFKA-10722
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10722
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1, 2.6.0
>            Reporter: fml2
>            Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
>     .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>    var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to