[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300942#comment-16300942
 ] 

ASF GitHub Bot commented on KAFKA-6269:
---------------------------------------

mjsax commented on a change in pull request #4300: KAFKA-6269: KTable restore 
fails after rebalance
URL: https://github.com/apache/kafka/pull/4300#discussion_r158426722
 
 

 ##########
 File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 ##########
 @@ -385,6 +379,130 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
         } catch (final TaskMigratedException expected) { /* ignore */ }
     }
 
+    @Test
+    public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled()
 {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled commit marker at offset 5 so rest of records 6..10
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        // end offsets should start after commit marker of 5 from above
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
6L));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown task migrated exception");
+        } catch (final TaskMigratedException expected) {
+            /* ignore */
+        }
+    }
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled()
 {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+        // records have offsets of 0..9 10 is commit marker so 11 is end offset
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
11L));
+
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled()
 {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic()
 {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
5L));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, 5, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(5));
+    }
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic()
 {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, 10, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void 
shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled()
 {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4 last offset before commit is 4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled so commit marker at offset 5 so records start at 6
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+        // commit marker is 5 so ending offset is 6
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
6L));
 
 Review comment:
   as above. endOffset should be `12` and passed offset limit in next line 
should be 6.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable state restore fails after rebalance
> ------------------------------------------
>
>                 Key: KAFKA-6269
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andreas Schroeder
>            Assignee: Bill Bejeck
>            Priority: Blocker
>             Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>                           sourceTopic: String,
>                           existsTopic: String,
>                           valueSerde: Serde[V],
>                           valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>       .withKeySerde(Serdes.String())
>       .withValueSerde(valueSerde)
>       .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>              "entity-B",
>              "entity-B-exists",
>              EntityBInfoSerde,
>              ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> >             KSTREAM-SOURCE-0000000008:
> >                     topics:         [entity-A-exists]
> >                     children:       [KTABLE-SOURCE-0000000009]
> >             KTABLE-SOURCE-0000000009:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-JOINTHIS-0000000011]
> >             KTABLE-JOINTHIS-0000000011:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> >             KSTREAM-SOURCE-0000000003:
> >                     topics:         [entity-B-exists]
> >                     children:       [KTABLE-SOURCE-0000000004]
> >             KTABLE-SOURCE-0000000004:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-JOINOTHER-0000000012]
> >             KTABLE-JOINOTHER-0000000012:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is 
> rebuilt from entity-B-exists that of course can change while the rebuild is 
> happening, since it the topic entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-A-exists-24 should not change while restoring: old end offset 6483978, 
> current offset 6485108
> > StreamsTask taskId: 1_24
> > >   ProcessorTopology:
> >             KSTREAM-SOURCE-0000000008:
> >                     topics:         [entity-A-exists]
> >                     children:       [KTABLE-SOURCE-0000000009]
> >             KTABLE-SOURCE-0000000009:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-JOINTHIS-0000000011]
> >             KTABLE-JOINTHIS-0000000011:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> >             KSTREAM-SOURCE-0000000003:
> >                     topics:         [entity-B-exists]
> >                     children:       [KTABLE-SOURCE-0000000004]
> >             KTABLE-SOURCE-0000000004:
> >                     states:         [entity-B-exists-persisted]
> >                     children:       [KTABLE-JOINOTHER-0000000012]
> >             KTABLE-JOINOTHER-0000000012:
> >                     states:         [entity-A-exists-persisted]
> >                     children:       [KTABLE-MERGE-0000000010]
> >             KTABLE-MERGE-0000000010:
> >                     states:         [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads 
> continuously try to recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to