Andreas Schroeder created KAFKA-6269:
----------------------------------------

             Summary: KTable state restore fails after rebalance
                 Key: KAFKA-6269
                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Andreas Schroeder


I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

def buildTable[V](builder: StreamsBuilder,
                          sourceTopic: String,
                          existsTopic: String,
                          valueSerde: Serde[V],
                          valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
      .withKeySerde(Serdes.String())
      .withValueSerde(valueSerde)
      .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
             "entity-B",
             "entity-B-exists",
             EntityBInfoSerde,
             ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)


We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial 
launch
of the processor works fine, but when killing one processor and letting
him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is
larger than 120 (sometimes 157, sometimes just 132), so the partition / task
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another thread.'
We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> >     ProcessorTopology:
>               KSTREAM-SOURCE-0000000008:
>                       topics:         [entity-A-exists]
>                       children:       [KTABLE-SOURCE-0000000009]
>               KTABLE-SOURCE-0000000009:
>                       states:         [entity-A-exists-persisted]
>                       children:       [KTABLE-JOINTHIS-0000000011]
>               KTABLE-JOINTHIS-0000000011:
>                       states:         [entity-B-exists-persisted]
>                       children:       [KTABLE-MERGE-0000000010]
>               KTABLE-MERGE-0000000010:
>                       states:         [entity-A-joined-with-entity-B]
>               KSTREAM-SOURCE-0000000003:
>                       topics:         [entity-B-exists]
>                       children:       [KTABLE-SOURCE-0000000004]
>               KTABLE-SOURCE-0000000004:
>                       states:         [entity-B-exists-persisted]
>                       children:       [KTABLE-JOINOTHER-0000000012]
>               KTABLE-JOINOTHER-0000000012:
>                       states:         [entity-A-exists-persisted]
>                       children:       [KTABLE-MERGE-0000000010]
>               KTABLE-MERGE-0000000010:
>                       states:         [entity-A-joined-with-entity-B]
> Partitions [entity-A-exists-0, entity-B-exists-0]

        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

That one surprises me: the KTable state store entity-B-exists-persisted is
rebuilt from entity-B-exists that of course can change while the rebuild
is happening, since it the topic entity-B-exists is fed by another stream
thread.

Another one, very similar:

org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-A-exists-24 should not change while restoring: old end offset 6483978, 
current offset 6485108
> StreamsTask taskId: 1_24
> >     ProcessorTopology:
>               KSTREAM-SOURCE-0000000008:
>                       topics:         [entity-A-exists]
>                       children:       [KTABLE-SOURCE-0000000009]
>               KTABLE-SOURCE-0000000009:
>                       states:         [entity-A-exists-persisted]
>                       children:       [KTABLE-JOINTHIS-0000000011]
>               KTABLE-JOINTHIS-0000000011:
>                       states:         [entity-B-exists-persisted]
>                       children:       [KTABLE-MERGE-0000000010]
>               KTABLE-MERGE-0000000010:
>                       states:         [entity-A-joined-with-entity-B]
>               KSTREAM-SOURCE-0000000003:
>                       topics:         [entity-B-exists]
>                       children:       [KTABLE-SOURCE-0000000004]
>               KTABLE-SOURCE-0000000004:
>                       states:         [entity-B-exists-persisted]
>                       children:       [KTABLE-JOINOTHER-0000000012]
>               KTABLE-JOINOTHER-0000000012:
>                       states:         [entity-A-exists-persisted]
>                       children:       [KTABLE-MERGE-0000000010]
>               KTABLE-MERGE-0000000010:
>                       states:         [entity-A-joined-with-entity-B]
> Partitions [entity-A-exists-24, entity-B-exists-24]

        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

Again, the topic entity-A-exists is fed by another stream thread.

We saw around 60000 such errors per minute, as the stream threads continuously 
try
to recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to