[ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473238#comment-16473238
 ] 

ASF GitHub Bot commented on KAFKA-6730:
---------------------------------------

ConcurrencyPractitioner closed pull request #4901: [KAFKA-6730] Simplify state 
store recovery
URL: https://github.com/apache/kafka/pull/4901
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9b67fc4263c..a20f1acf675 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -52,6 +52,8 @@
     private final Map<TopicPartition, StateRestorer> stateRestorers = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new 
HashMap<>();
+    private boolean hasRetrievedOffsets;
+    private Map<TopicPartition, Long> updatedEndOffsets;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener 
userStateRestoreListener,
@@ -59,6 +61,7 @@ public StoreChangelogReader(final Consumer<byte[], byte[]> 
restoreConsumer,
         this.restoreConsumer = restoreConsumer;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
+        this.hasRetrievedOffsets = false;
     }
 
     @Override
@@ -83,10 +86,25 @@ public void register(final StateRestorer restorer) {
 
         final Set<TopicPartition> restoringPartitions = new 
HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = 
poll(restoreConsumer, 10);
-            for (final TopicPartition partition : restoringPartitions) {
-                restorePartition(allRecords, partition, 
active.restoringTaskFor(partition));
+            if (!needsRestoring.isEmpty() && !hasRetrievedOffsets) {
+                hasRetrievedOffsets = true;
+                updatedEndOffsets = 
restoreConsumer.endOffsets(restoringPartitions);
             }
+            final ConsumerRecords<byte[], byte[]> records = 
poll(restoreConsumer, 10);
+            final Iterator<TopicPartition> iterator = 
restoringPartitions.iterator();
+            final Set<TopicPartition> completedPartitions = new HashSet<>();
+            while (iterator.hasNext()) {
+                final TopicPartition partition = iterator.next();
+                final StateRestorer restorer = stateRestorers.get(partition);
+                final long pos = processNext(records.records(partition), 
restorer, updatedEndOffsets.get(partition));
+                restorer.setRestoredOffset(pos);
+                if (restorer.hasCompleted(pos, 
updatedEndOffsets.get(partition))) {
+                    restorer.restoreDone();
+                    needsRestoring.remove(partition);
+                    completedPartitions.add(partition);
+                }
+            }
+            restoringPartitions.removeAll(completedPartitions);
         } catch (final InvalidOffsetException recoverableException) {
             log.warn("Restoring StreamTasks failed. Deleting StreamTasks 
stores to recreate from scratch.", recoverableException);
             final Set<TopicPartition> partitions = 
recoverableException.partitions();
@@ -240,41 +258,7 @@ public void reset() {
         needsRestoring.clear();
         endOffsets.clear();
         needsInitializing.clear();
-    }
-
-    /**
-     * @throws TaskMigratedException if another thread wrote to the changelog 
topic that is currently restored
-     */
-    private void restorePartition(final ConsumerRecords<byte[], byte[]> 
allRecords,
-                                  final TopicPartition topicPartition,
-                                  final Task task) {
-        final StateRestorer restorer = stateRestorers.get(topicPartition);
-        final Long endOffset = endOffsets.get(topicPartition);
-        final long pos = processNext(allRecords.records(topicPartition), 
restorer, endOffset);
-        restorer.setRestoredOffset(pos);
-        if (restorer.hasCompleted(pos, endOffset)) {
-            if (pos > endOffset) {
-                throw new TaskMigratedException(task, topicPartition, 
endOffset, pos);
-            }
-
-            // need to check for changelog topic
-            if (restorer.offsetLimit() == Long.MAX_VALUE) {
-                final Long updatedEndOffset = 
restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
-                if (!restorer.hasCompleted(pos, updatedEndOffset)) {
-                    throw new TaskMigratedException(task, topicPartition, 
updatedEndOffset, pos);
-                }
-            }
-
-
-            log.debug("Completed restoring state from changelog {} with {} 
records ranging from offset {} to {}",
-                      topicPartition,
-                      restorer.restoredNumRecords(),
-                      restorer.startingOffset(),
-                      restorer.restoredOffset());
-
-            restorer.restoreDone();
-            needsRestoring.remove(topicPartition);
-        }
+        hasRetrievedOffsets = false;
     }
 
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> 
records,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6308ca7fd80..b0a78bfa407 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -309,7 +309,6 @@ void setConsumer(final Consumer<byte[], byte[]> consumer) {
     /**
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
-     * @throws TaskMigratedException if the task producer got fenced or 
consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         active.initializeNewTasks();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4efadb1..1739dfce4fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -37,6 +37,7 @@
 import org.easymock.MockType;
 import org.hamcrest.CoreMatchers;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -377,6 +378,7 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
 
+    @Ignore
     @Test
     public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic()
 {
         final int messages = 10;
@@ -396,6 +398,7 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
     }
 
 
+    @Ignore
     @Test
     public void 
shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck()
 {
         final int messages = 10;
@@ -434,6 +437,7 @@ public void 
shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestore
     }
 
 
+    @Ignore
     @Test
     public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled()
 {
         final int totalMessages = 10;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify state store recovery
> -----------------------------
>
>                 Key: KAFKA-6730
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6730
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>             Fix For: 2.0.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to