[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168977#comment-17168977 ]
John Roesler commented on KAFKA-10322: -------------------------------------- Hi all, Thanks for your insights [~ableegoldman] ! I realized after reading your comment that I'd never written down the full extent of the idea you referenced. I've just documented it here: https://issues.apache.org/jira/browse/KAFKA-10336 It seems like we could use the same general mechanism to provide an upgrade path to a version of Streams that solves this issue. Thanks, -John > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Environment: windows/linux > Reporter: Tomasz Bradło > Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)