[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168977#comment-17168977
 ] 

John Roesler commented on KAFKA-10322:
--------------------------------------

Hi all,

Thanks for your insights [~ableegoldman] !

I realized after reading your comment that I'd never written down the full 
extent of the idea you referenced. I've just documented it here: 
https://issues.apache.org/jira/browse/KAFKA-10336

It seems like we could use the same general mechanism to provide an upgrade 
path to a version of Streams that solves this issue.

Thanks,

-John

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10322
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10322
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>         Environment: windows/linux
>            Reporter: Tomasz Bradło
>            Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>    
>     fun addStream(kStreamBuilder: StreamsBuilder) {
>         val storeSupplier = Stores.inMemoryWindowStore("count-store",
>                 Duration.ofDays(10),
>                 Duration.ofDays(1),
>                 false)
>         val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = 
> Stores
>                 .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
>         kStreamBuilder
>                 .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
>                 .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
>                 .groupByKey()
>                 .windowedBy(TimeWindows.of(Duration.ofDays(1)))
>                 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
>                 .toStream()
>                 .to("topic1-count")
>         val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
>         kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
>     }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to