[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266157#comment-16266157
 ] 

Guozhang Wang commented on KAFKA-6269:
--------------------------------------

[~mjsax] What bug have you found that relates to the guard not effective?

> KTable state restore fails after rebalance
> ------------------------------------------
>
>                 Key: KAFKA-6269
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andreas Schroeder
>            Priority: Blocker
>             Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>                           sourceTopic: String,
>                           existsTopic: String,
>                           valueSerde: Serde[V],
>                           valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>       .withKeySerde(Serdes.String())
>       .withValueSerde(valueSerde)
>       .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>              "entity-B",
>              "entity-B-exists",
>              EntityBInfoSerde,
>              ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> >             KSTREAM-SOURCE-0000000008:
> >                     topics:         [entity-A-exists]
> >                     children:       [KTABLE-SOURCE-0000000009]
> >             KTABLE-SOURCE-0000000009:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-JOINTHIS-0000000011]
> >             KTABLE-JOINTHIS-0000000011:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> >             KSTREAM-SOURCE-0000000003:
> >                     topics:         [entity-B-exists]
> >                     children:       [KTABLE-SOURCE-0000000004]
> >             KTABLE-SOURCE-0000000004:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-JOINOTHER-0000000012]
> >             KTABLE-JOINOTHER-0000000012:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is 
> rebuilt from entity-B-exists that of course can change while the rebuild is 
> happening, since it the topic entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-A-exists-24 should not change while restoring: old end offset 6483978, 
> current offset 6485108
> > StreamsTask taskId: 1_24
> > >   ProcessorTopology:
> >             KSTREAM-SOURCE-0000000008:
> >                     topics:         [entity-A-exists]
> >                     children:       [KTABLE-SOURCE-0000000009]
> >             KTABLE-SOURCE-0000000009:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-JOINTHIS-0000000011]
> >             KTABLE-JOINTHIS-0000000011:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> >             KSTREAM-SOURCE-0000000003:
> >                     topics:         [entity-B-exists]
> >                     children:       [KTABLE-SOURCE-0000000004]
> >             KTABLE-SOURCE-0000000004:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-JOINOTHER-0000000012]
> >             KTABLE-JOINOTHER-0000000012:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads 
> continuously try to recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to