[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401033#comment-16401033
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---------------------------------------

mjsax closed pull request #4651: KAFKA-6106: Postpone normal processing of 
tasks until restoration of all tasks completed
URL: https://github.com/apache/kafka/pull/4651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c806bfde47e..92045713146 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,27 +73,12 @@ void addNewTask(final T task) {
         created.put(task.id(), task);
     }
 
-    Set<TopicPartition> uninitializedPartitions() {
-        if (created.isEmpty()) {
-            return Collections.emptySet();
-        }
-        final Set<TopicPartition> partitions = new HashSet<>();
-        for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
-            if (entry.getValue().hasStateStores()) {
-                partitions.addAll(entry.getValue().partitions());
-            }
-        }
-        return partitions;
-    }
-
     /**
-     * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    Set<TopicPartition> initializeNewTasks() {
-        final Set<TopicPartition> readyPartitions = new HashSet<>();
+    void initializeNewTasks() {
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());
         }
@@ -104,7 +89,7 @@ void addNewTask(final T task) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
-                    transitionToRunning(entry.getValue(), readyPartitions);
+                    transitionToRunning(entry.getValue());
                 }
                 it.remove();
             } catch (final LockException e) {
@@ -112,21 +97,19 @@ void addNewTask(final T task) {
                 log.trace("Could not create {} {} due to {}; will retry", 
taskTypeName, entry.getKey(), e.getMessage());
             }
         }
-        return readyPartitions;
     }
 
-    Set<TopicPartition> updateRestored(final Collection<TopicPartition> 
restored) {
+    void updateRestored(final Collection<TopicPartition> restored) {
         if (restored.isEmpty()) {
-            return Collections.emptySet();
+            return;
         }
         log.trace("{} changelog partitions that have completed restoring so 
far: {}", taskTypeName, restored);
-        final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, T>> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             final T task = entry.getValue();
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task, resume);
+                transitionToRunning(task);
                 it.remove();
                 log.trace("{} {} completed restoration as all its changelog 
partitions {} have been applied to restore state",
                         taskTypeName,
@@ -146,7 +129,6 @@ void addNewTask(final T task) {
         if (allTasksRunning()) {
             restoredPartitions.clear();
         }
-        return resume;
     }
 
     boolean allTasksRunning() {
@@ -243,7 +225,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set<TopicPartition>
                 suspended.remove(taskId);
                 task.resume();
                 try {
-                    transitionToRunning(task, new HashSet<TopicPartition>());
+                    transitionToRunning(task);
                 } catch (final TaskMigratedException e) {
                     // we need to catch migration exception internally since 
this function
                     // is triggered in the rebalance callback
@@ -278,15 +260,12 @@ private void addToRestoring(final T task) {
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void transitionToRunning(final T task, final Set<TopicPartition> 
readyPartitions) {
+    private void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
-            if (task.hasStateStores()) {
-                readyPartitions.add(topicPartition);
-            }
         }
         for (TopicPartition topicPartition : task.changelogPartitions()) {
             runningByPartition.put(topicPartition, task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9bbc0da26a5..02a4bb92137 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -899,7 +899,7 @@ private void addRecordsToTasks(final 
ConsumerRecords<byte[], byte[]> records) {
             final StreamTask task = taskManager.activeTask(partition);
 
             if (task.isClosed()) {
-                log.warn("Stream task {} is already closed, probably because 
it got unexpectly migrated to another thread already. " +
+                log.warn("Stream task {} is already closed, probably because 
it got unexpectedly migrated to another thread already. " +
                         "Notifying the thread to trigger a new rebalance 
immediately.", task.id());
                 throw new TaskMigratedException(task);
             }
@@ -1065,7 +1065,7 @@ private void maybeUpdateStandbyTasks(final long now) {
                         }
 
                         if (task.isClosed()) {
-                            log.warn("Standby task {} is already closed, 
probably because it got unexpectly migrated to another thread already. " +
+                            log.warn("Standby task {} is already closed, 
probably because it got unexpectedly migrated to another thread already. " +
                                     "Notifying the thread to trigger a new 
rebalance immediately.", task.id());
                             throw new TaskMigratedException(task);
                         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 80df5179a37..6308ca7fd80 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -106,9 +106,9 @@ void createTasks(final Collection<TopicPartition> 
assignment) {
         active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
         addStreamTasks(assignment);
         addStandbyTasks();
-        final Set<TopicPartition> partitions = 
active.uninitializedPartitions();
-        log.trace("Pausing partitions: {}", partitions);
-        consumer.pause(partitions);
+        // Pause all the partitions until the underlying state store is ready 
for all the active tasks.
+        log.trace("Pausing partitions: {}", assignment);
+        consumer.pause(assignment);
     }
 
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
@@ -312,18 +312,17 @@ void setConsumer(final Consumer<byte[], byte[]> consumer) 
{
      * @throws TaskMigratedException if the task producer got fenced or 
consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
-        final Set<TopicPartition> resumed = active.initializeNewTasks();
+        active.initializeNewTasks();
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = 
changelogReader.restore(active);
 
-        resumed.addAll(active.updateRestored(restored));
+        active.updateRestored(restored);
 
-        if (!resumed.isEmpty()) {
-            log.trace("Resuming partitions {}", resumed);
-            consumer.resume(resumed);
-        }
         if (active.allTasksRunning()) {
+            Set<TopicPartition> assignment = consumer.assignment();
+            log.trace("Resuming partitions {}", assignment);
+            consumer.resume(assignment);
             assignStandbyPartitions();
             return true;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 246d0477eb2..fcd23220bac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -57,36 +57,6 @@ public void before() {
         EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
     }
 
-    @Test
-    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(true);
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = 
assignedTasks.uninitializedPartitions();
-        assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
-    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = 
assignedTasks.uninitializedPartitions();
-        assertTrue(partitions.isEmpty());
-        EasyMock.verify(t1, t2);
-    }
-
     @Test
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
@@ -112,19 +82,17 @@ public void 
shouldMoveInitializedTasksNeedingRestoreToRestoring() {
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
 
         EasyMock.replay(t1, t2);
 
         assignedTasks.addNewTask(t1);
         assignedTasks.addNewTask(t2);
 
-        final Set<TopicPartition> readyPartitions = 
assignedTasks.initializeNewTasks();
+        assignedTasks.initializeNewTasks();
 
         Collection<StreamTask> restoring = assignedTasks.restoringTasks();
         assertThat(restoring.size(), equalTo(1));
         assertSame(restoring.iterator().next(), t1);
-        assertThat(readyPartitions, equalTo(t2partitions));
     }
 
     @Test
@@ -134,15 +102,13 @@ public void 
shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
         EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
 
         EasyMock.replay(t2);
 
         assignedTasks.addNewTask(t2);
-        final Set<TopicPartition> toResume = 
assignedTasks.initializeNewTasks();
+        assignedTasks.initializeNewTasks();
 
         assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.singleton(taskId2)));
-        assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
     }
 
     @Test
@@ -158,9 +124,9 @@ public void shouldTransitionFullyRestoredTasksToRunning() {
 
         addAndInitTask();
 
-        
assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
-        Set<TopicPartition> partitions = 
assignedTasks.updateRestored(Utils.mkSet(changeLog2));
-        assertThat(partitions, equalTo(task1Partitions));
+        assignedTasks.updateRestored(Utils.mkSet(changeLog1));
+        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.<TaskId>emptySet()));
+        assignedTasks.updateRestored(Utils.mkSet(changeLog2));
         assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.singleton(taskId1)));
     }
 
@@ -282,7 +248,6 @@ private void mockTaskInitialization() {
         EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 482b764f3ff..b22d98ee41c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -158,6 +158,10 @@ public void testPartitionAssignmentChangeForSingleGroup() {
         // assign single partition
         assignedPartitions = Collections.singletonList(t1p1);
         thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
@@ -378,8 +382,13 @@ public void 
shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
         assertEquals(1, clientSupplier.producers.size());
@@ -411,6 +420,12 @@ public void 
shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable()
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
         thread.runOnce(-1);
@@ -439,7 +454,12 @@ public void 
shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
 
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -595,6 +615,9 @@ public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -659,6 +682,10 @@ public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAt
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -714,8 +741,10 @@ public void 
shouldReturnActiveTaskMetadataWhileRunningState() {
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -883,9 +912,9 @@ public void close() { }
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
         clientSupplier.consumer.assign(assignedPartitions);
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
 
@@ -1074,17 +1103,18 @@ public void 
shouldReportSkippedRecordsForInvalidTimestamps() {
         thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
-        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
         thread.taskManager().setAssignmentMetadata(
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
             Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         final MetricName skippedTotalMetric = 
metrics.metricName("skipped-records-total", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 648e9b080cd..e8ef7ea4925 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -117,7 +117,7 @@
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         taskManager = new TaskManager(changeLogReader,
                                       UUID.randomUUID(),
                                       "",
@@ -324,11 +324,9 @@ public void shouldNotAddResumedStandbyTasks() {
         verify(standby, standbyTaskCreator);
     }
 
-
     @Test
-    public void shouldPauseActiveUninitializedPartitions() {
+    public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
-        
EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions);
         consumer.pause(taskId0Partitions);
         EasyMock.expectLastCall();
         replay();
@@ -415,21 +413,17 @@ public void shouldUnassignChangelogPartitionsOnShutdown() 
{
 
     @Test
     public void shouldInitializeNewActiveTasks() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
-        
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
+        
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
         EasyMock.expectLastCall();
         replay();
+
         taskManager.updateNewAndRestoringTasks();
         verify(active);
     }
 
     @Test
     public void shouldInitializeNewStandbyTasks() {
-        EasyMock.expect(standby.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
-        
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
+        
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
         EasyMock.expectLastCall();
         replay();
 
@@ -439,22 +433,21 @@ public void shouldInitializeNewStandbyTasks() {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
         
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.updateRestored(taskId0Partitions)).
-                andReturn(Collections.<TopicPartition>emptySet());
-
+        active.updateRestored(taskId0Partitions);
+        EasyMock.expectLastCall();
         replay();
+
         taskManager.updateNewAndRestoringTasks();
         verify(changeLogReader, active);
     }
 
     @Test
     public void shouldResumeRestoredPartitions() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
         
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.updateRestored(taskId0Partitions)).
-                andReturn(taskId0Partitions);
+        EasyMock.expect(active.allTasksRunning()).andReturn(true);
+        EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
+        
EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
 
         consumer.resume(taskId0Partitions);
         EasyMock.expectLastCall();
@@ -475,10 +468,7 @@ public void 
shouldAssignStandbyPartitionsWhenAllActiveTasksAreRunning() {
 
     @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(false);
-        
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
         replay();
 
         assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -626,16 +616,13 @@ public void shouldPunctuateActiveTasks() {
     }
 
     @Test
-    public void shouldResumeConsumptionOfInitializedPartitions() {
-        final Set<TopicPartition> resumed = Collections.singleton(new 
TopicPartition("topic", 0));
-        EasyMock.expect(active.initializeNewTasks()).andReturn(resumed);
-        
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
-        consumer.resume(resumed);
-        EasyMock.expectLastCall();
-
+    public void shouldNotResumeConsumptionUntilAllStoresRestored() {
+        EasyMock.expect(active.allTasksRunning()).andReturn(false);
+        Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) 
EasyMock.createStrictMock(Consumer.class);
+        taskManager.setConsumer(consumer);
         EasyMock.replay(active, consumer);
 
+        // shouldn't invoke `resume` method in consumer
         taskManager.updateNewAndRestoringTasks();
         EasyMock.verify(consumer);
     }
@@ -662,10 +649,7 @@ public void shouldUpdateTasksFromPartitionAssignment() {
 
     private void mockAssignStandbyPartitions(final long offset) {
         final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
         
EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
         
EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0,
 offset));
         restoreConsumer.assign(taskId0Partitions);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6106
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6106
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Guozhang Wang
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>              Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to