[
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271399#comment-16271399
]
Guozhang Wang commented on KAFKA-6269:
--------------------------------------
[~andreas-schroeder] We have spotted the regression bug introduced in 1.0.0 and
we are working on a fix that would be included in the coming 1.0.1 release. But
the bug-fix release may only come in early December, and I'm wondering how much
this issue is blocking your team's progress. Would you be OK with compiling
from {{1.0}} branch directly once we have the bug fix in?
> KTable state restore fails after rebalance
> ------------------------------------------
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Andreas Schroeder
> Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde,
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
> sourceTopic: String,
> existsTopic: String,
> valueSerde: Serde[V],
> valueMapper: ValueMapper[String, V]):
> KTable[String, V] = {
> val stream: KStream[String, String] = builder.stream[String,
> String](sourceTopic)
> val transformed: KStream[String, V] = stream.mapValues(valueMapper)
> transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
> val inMemoryStoreName = s"$existsTopic-persisted"
> val materialized =
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
> .withKeySerde(Serdes.String())
> .withValueSerde(valueSerde)
> .withLoggingDisabled()
> builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
> buildTable(builder,
> "entity-B",
> "entity-B-exists",
> EntityBInfoSerde,
> ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
> buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(),
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) =>
> EntityDiff.fromJoin(a, b)
> val materialized =
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
> .withKeySerde(Serdes.String())
> .withValueSerde(EntityDiffSerde)
> .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB,
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30
> partitions so that there is a total of 4 x 30 = 120 partitions to consume.
> The initial launch of the processor works fine, but when killing one
> processor and letting him re-join the stream threads leads to some faulty
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly
> with the error message of 'Detected a task that got migrated to another
> thread.' We gave the processor half an hour to recover; usually, rebuilding
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got
> migrated to another thread. This implies that this thread missed a rebalance
> and dropped out of the consumer group. Trying to rejoin the consumer group
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of
> entity-B-exists-0 should not change while restoring: old end offset 4750539,
> current offset 4751388
> > StreamsTask taskId: 1_0
> > > ProcessorTopology:
> > KSTREAM-SOURCE-0000000008:
> > topics: [entity-A-exists]
> > children: [KTABLE-SOURCE-0000000009]
> > KTABLE-SOURCE-0000000009:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-JOINTHIS-0000000011]
> > KTABLE-JOINTHIS-0000000011:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-0000000003:
> > topics: [entity-B-exists]
> > children: [KTABLE-SOURCE-0000000004]
> > KTABLE-SOURCE-0000000004:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-JOINOTHER-0000000012]
> > KTABLE-JOINOTHER-0000000012:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is
> rebuilt from entity-B-exists that of course can change while the rebuild is
> happening, since it the topic entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of
> entity-A-exists-24 should not change while restoring: old end offset 6483978,
> current offset 6485108
> > StreamsTask taskId: 1_24
> > > ProcessorTopology:
> > KSTREAM-SOURCE-0000000008:
> > topics: [entity-A-exists]
> > children: [KTABLE-SOURCE-0000000009]
> > KTABLE-SOURCE-0000000009:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-JOINTHIS-0000000011]
> > KTABLE-JOINTHIS-0000000011:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-0000000003:
> > topics: [entity-B-exists]
> > children: [KTABLE-SOURCE-0000000004]
> > KTABLE-SOURCE-0000000004:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-JOINOTHER-0000000012]
> > KTABLE-JOINOTHER-0000000012:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads
> continuously try to recover and fail.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)