[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327992#comment-16327992 ] Matthias J. Sax commented on KAFKA-6269: Release plan for v1.0.1 was proposed today: https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.0.1 > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327592#comment-16327592 ] Matthias J. Sax commented on KAFKA-6269: I would assume that branch `1.0` ([https://github.com/apache/kafka/tree/1.0|https://github.com/apache/kafka/tree/1.0)]) should be quite stable – it only contains bug fixes for `1.0.1`. As an alternative, you can also checkout `1.0.0` commit ([https://github.com/apache/kafka/tree/1.0.0|https://github.com/apache/kafka/tree/1.0.0)]) and cherry-pick the fix. Btw: Brokers are always backward compatible, to yes, KS 0.11.0.1 will work with `1.0.0` brokers. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children:
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327439#comment-16327439 ] Bill Bejeck commented on KAFKA-6269: [~dminkovsky] , are you good now with the 1.1.0-SNAPSHOT build? > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > > topics: [entity-B-exists] > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327259#comment-16327259 ] Dmitry Minkovsky commented on KAFKA-6269: - I'm being hit by this and wondering what to do. * I upgraded my brokers to 1.0.0. Will Kafka Streams to 0.11.0.1 work with a 1.0.0 broker? * I'm fine building from trunk. Could you recommend a "stable" commit for me to use? With regard to a workaround for 1.0.0, I am confused about the null masking option, and am confused why groupByKey().aggregate() will work. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states:
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308416#comment-16308416 ] ASF GitHub Bot commented on KAFKA-6269: --- guozhangwang closed pull request #4300: KAFKA-6269: KTable restore fails after rebalance URL: https://github.com/apache/kafka/pull/4300 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 10aedbb93c4..d9085b0697f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -197,7 +197,7 @@ public synchronized void addRecord(ConsumerRecordrecord) { throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer"); List > recs = this.records.get(tp); if (recs == null) { -recs = new ArrayList >(); +recs = new ArrayList<>(); this.records.put(tp, recs); } recs.add(record); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 178d2bb96d0..ba17ce95ede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -96,7 +96,6 @@ public void register(final StateRestorer restorer) { restoreConsumer.seekToBeginning(partitions); } - if (needsRestoring.isEmpty()) { restoreConsumer.unsubscribe(); } @@ -174,8 +173,8 @@ private void startRestoration(final Map initializ if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { restoreConsumer.seek(restorer.partition(), restorer.checkpoint()); logRestoreOffsets(restorer.partition(), -restorer.checkpoint(), -endOffsets.get(restorer.partition())); + restorer.checkpoint(), + endOffsets.get(restorer.partition())); restorer.setStartingOffset(restoreConsumer.position(restorer.partition())); restorer.restoreStarted(); } else { @@ -187,8 +186,8 @@ private void startRestoration(final Map initializ for (final StateRestorer restorer : needsPositionUpdate) { final long position = restoreConsumer.position(restorer.partition()); logRestoreOffsets(restorer.partition(), -position, -endOffsets.get(restorer.partition())); + position, + endOffsets.get(restorer.partition())); restorer.setStartingOffset(position); restorer.restoreStarted(); } @@ -252,7 +251,7 @@ private void restorePartition(final ConsumerRecords allRecords, final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); restorer.setRestoredOffset(pos); if (restorer.hasCompleted(pos, endOffset)) { -if (pos > endOffset + 1) { +if (pos > endOffset) { throw new TaskMigratedException(task, topicPartition, endOffset, pos); } @@ -271,30 +270,40 @@ private long processNext(final List > records, final StateRestorer restorer, final Long endOffset) { final List > restoreRecords = new ArrayList<>(); -long offset = -1; - +long nextPosition = -1; +int numberRecords = records.size(); +int numberRestored = 0; for (final ConsumerRecord record : records) { -offset = record.offset(); +final long offset = record.offset(); if (restorer.hasCompleted(offset, endOffset)) { +nextPosition = record.offset(); break; } +numberRestored++; if (record.key() != null) { restoreRecords.add(KeyValue.pair(record.key(), record.value())); } } -if (offset == -1) { -offset =
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300940#comment-16300940 ] ASF GitHub Bot commented on KAFKA-6269: --- mjsax commented on a change in pull request #4300: KAFKA-6269: KTable restore fails after rebalance URL: https://github.com/apache/kafka/pull/4300#discussion_r158426654 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ## @@ -385,6 +379,130 @@ public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore } catch (final TaskMigratedException expected) { /* ignore */ } } +@Test +public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() { +final int totalMessages = 10; +assignPartition(totalMessages, topicPartition); +// records 0..4 +addRecords(5, topicPartition, 0); +//EOS enabled commit marker at offset 5 so rest of records 6..10 +addRecords(5, topicPartition, 6); +consumer.assign(Collections.emptyList()); + +// end offsets should start after commit marker of 5 from above +consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L)); +changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + +expect(active.restoringTaskFor(topicPartition)).andReturn(task); +replay(active); +try { +changelogReader.restore(active); +fail("Should have thrown task migrated exception"); +} catch (final TaskMigratedException expected) { +/* ignore */ +} +} + +@Test +public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() { +final int totalMessages = 10; +setupConsumer(totalMessages, topicPartition); +// records have offsets of 0..9 10 is commit marker so 11 is end offset +consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L)); + +consumer.assign(Collections.emptyList()); + +changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + +expect(active.restoringTaskFor(topicPartition)).andReturn(task); +replay(active); + +changelogReader.restore(active); +assertThat(callback.restored.size(), equalTo(10)); +} + + +@Test +public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() { +final int totalMessages = 10; +setupConsumer(totalMessages, topicPartition); + +consumer.assign(Collections.emptyList()); + +changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + +expect(active.restoringTaskFor(topicPartition)).andReturn(task); +replay(active); + +changelogReader.restore(active); +assertThat(callback.restored.size(), equalTo(10)); +} + +@Test +public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() { +final int messages = 10; +setupConsumer(messages, topicPartition); +consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L)); Review comment: I just thought about this. I think `endOffset` should be actual endOffset, ie, `11` for this test -- we pass in the `offsetLimit` as 5 in `StateRestorer` below. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues-test.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265588#comment-16265588 ] Matthias J. Sax commented on KAFKA-6269: [~stevenschlansker] Thanks for you comment, but this is not the official Jira board. Please comment on [https://issues.apache.org/jira/browse/KAFKA-6269.] Thx. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues-test.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues-test.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265581#comment-16265581 ] Steven Schlansker commented on KAFKA-6269: -- We just hit a similar issue during a rollout where our application repeatedly refused to start, giving error messages like the ones above. Looking forward to a fix in the next release. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues-test.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states:
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280993#comment-16280993 ] ASF GitHub Bot commented on KAFKA-6269: --- GitHub user bbejeck opened a pull request: https://github.com/apache/kafka/pull/4300 KAFKA-6269[WIP] DO NOT MERGE attempts to recreate original error *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbejeck/kafka KAFKA_6269_ktable_restore_fails_after_rebalance Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4300 commit 107a213af4f1106857bbc0b60dabb0bf5ecd3cc0 Author: Bill BejeckDate: 2017-12-06T21:57:58Z KAFKA-6269[WIP] attempts to recreate original error > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover;
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272972#comment-16272972 ] Andreas Schroeder commented on KAFKA-6269: -- [~guozhang] it's okay for my team to wait or the 1.0.1 release. Until then, we'll stick to the 0.11.0.1 version we are currently using. The reason to migrate to 1.0.0 was that we are experiencing some unfair task assignment across our stream processor nodes, which leads to some nodes crashing (and immediately being recreated). So our current system runs and we can wait for 1.0.1 Thanks however for giving suggestions on how to proceed! I'll try [~mjsax]'s suggestion on hiding the null value :) > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children:
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272013#comment-16272013 ] Matthias J. Sax commented on KAFKA-6269: About the workaround. You could mask {{null}} with a dummy value: `stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16271399#comment-16271399 ] Guozhang Wang commented on KAFKA-6269: -- [~andreas-schroeder] We have spotted the regression bug introduced in 1.0.0 and we are working on a fix that would be included in the coming 1.0.1 release. But the bug-fix release may only come in early December, and I'm wondering how much this issue is blocking your team's progress. Would you be OK with compiling from {{1.0}} branch directly once we have the bug fix in? > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted]
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269058#comment-16269058 ] Andreas Schroeder commented on KAFKA-6269: -- Hi again [~mjsax], I'm back with some news (finally): The issue we are having is that Records with null value are ignored. So deletes won't propagate to the outer join, so that our business logic doesn't work any more. See the [KGroupedStream API docs|http://apache.mirror.digionline.de/kafka/1.0.0/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Materialized)] Any other ideas? :) > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266491#comment-16266491 ] Andreas Schroeder commented on KAFKA-6269: -- Hi [~mjsax], thanks for the swift response and your workaround proposal. I tried to make that workaround work, but it breaks some of my integration tests. I'm still investigating my tests to determine if this workaround is feasible. I'll keep you posted. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266321#comment-16266321 ] Matthias J. Sax commented on KAFKA-6269: In the PR linked below, we changed the {{StoreChangelogReader}}. The change in the PR was a fix for regular restore case if EOS is enabled. Because of commit markers in the topic, using the record offset as "end offset" to check if restore is finished is not save and might end up in infinite loop (can dig out the JIRA if you wish). Thus, we return {{restoreConsumer.position()}} instead. However, this is wrong for the source-KTable case when the guard fires (originally, we set {{offset = record.offset}} if guard fires, but remove this line). Thus, even if we stop restore correctly, we return the wrong offset that is larger then expected end-offset and thus we get the exception. https://github.com/apache/kafka/commit/eaabb6cd0173c4f6854eb5da39194a7e3fc0162c#diff-46ed6d177221c8778965ecb1b6657be3R264 I was thinking about a potential fix. A straight forward fix would not work: returning eagerly if the guard condition is met, and return {{record.offset()}} would yield the EOS issue again. But I think, we could return the offset of the next record (the first one that we do not restore anymore), or {{restoreConsumer.position()}} if no next record exist (ie, if nobody actually added records to the source-KTable topic. WDYT? > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266157#comment-16266157 ] Guozhang Wang commented on KAFKA-6269: -- [~mjsax] What bug have you found that relates to the guard not effective? > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > > topics: [entity-B-exists] > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926 ] Matthias J. Sax commented on KAFKA-6269: Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion). The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue. For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264921#comment-16264921 ] Matthias J. Sax commented on KAFKA-6269: Thanks for reporting this. If you get the exception {{Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388}}, this means that a thread started to recover a state from a changelog topic and the endOffset moved during the process -- this should never happen (only if the task was migrated to another thread and this other thread writes into the changelog topic). If the state is not migrated, the thread that restores would be the only one that is "allowed" to write into the changelog; but as long it restores it does not write. What I could think of is a bug, that relates to an optimization we do: As you read KTables from source topics, there is no additional changelog topic as the source topic can be used to recreate the state (the error message shows that the source topic is used for store recovery). Thus, a upstream processor that write to the source topic can of course append data to this topic -- we actually have a guard for this case, but we change some code here, maybe introducing a bug that this guard does not work properly anymore: thus, the restoring thread "thinks" it reads a changelog topic (while it does read a source topic) for recovery and fails even if it shouldn't. I'll try to reproduce this locally and cycle back to you. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread