[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266157#comment-16266157 ]
Guozhang Wang commented on KAFKA-6269: -------------------------------------- [~mjsax] What bug have you found that relates to the guard not effective? > KTable state restore fails after rebalance > ------------------------------------------ > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Andreas Schroeder > Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-0000000008: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-0000000009] > > KTABLE-SOURCE-0000000009: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-0000000011] > > KTABLE-JOINTHIS-0000000011: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-0000000010] > > KTABLE-MERGE-0000000010: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-0000000003: > > topics: [entity-B-exists] > > children: [KTABLE-SOURCE-0000000004] > > KTABLE-SOURCE-0000000004: > > states: [entity-B-exists-persisted] > > children: [KTABLE-JOINOTHER-0000000012] > > KTABLE-JOINOTHER-0000000012: > > states: [entity-A-exists-persisted] > > children: [KTABLE-MERGE-0000000010] > > KTABLE-MERGE-0000000010: > > states: [entity-A-joined-with-entity-B] > > Partitions [entity-A-exists-0, entity-B-exists-0] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > {code} > That one surprises me: the KTable state store entity-B-exists-persisted is > rebuilt from entity-B-exists that of course can change while the rebuild is > happening, since it the topic entity-B-exists is fed by another stream thread. > Another one, very similar: > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-A-exists-24 should not change while restoring: old end offset 6483978, > current offset 6485108 > > StreamsTask taskId: 1_24 > > > ProcessorTopology: > > KSTREAM-SOURCE-0000000008: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-0000000009] > > KTABLE-SOURCE-0000000009: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-0000000011] > > KTABLE-JOINTHIS-0000000011: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-0000000010] > > KTABLE-MERGE-0000000010: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-0000000003: > > topics: [entity-B-exists] > > children: [KTABLE-SOURCE-0000000004] > > KTABLE-SOURCE-0000000004: > > states: [entity-B-exists-persisted] > > children: [KTABLE-JOINOTHER-0000000012] > > KTABLE-JOINOTHER-0000000012: > > states: [entity-A-exists-persisted] > > children: [KTABLE-MERGE-0000000010] > > KTABLE-MERGE-0000000010: > > states: [entity-A-joined-with-entity-B] > > Partitions [entity-A-exists-24, entity-B-exists-24] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > {code} > Again, the topic entity-A-exists is fed by another stream thread. > We saw around 60000 such errors per minute, as the stream threads > continuously try to recover and fail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)