This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22606a0a4d4 KAFKA-14530: Check state updater more often (#13017)
22606a0a4d4 is described below

commit 22606a0a4d45dfb37b72a40de3728778ac4ffe84
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 12 12:40:07 2023 +0100

    KAFKA-14530: Check state updater more often (#13017)
    
    In the new state restoration code, the state updater needs to be checked 
regularly
    by the main thread to transfer ownership of tasks back to the main thread 
once the
    state of the task is restored. The more often we check this, the faster we 
can
    start processing the tasks.
    
    Currently, we only check the state updater once in every loop iteration of 
the state
    updater. And while we couldn't observe this to be strictly not often 
enough, we can
    increase the number of checks easily by moving the check inside the inner 
processing
    loop. This would mean that once we have iterated over `numIterations` 
records, we can
    already start processing tasks that have finished restoration in the 
meantime.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 51 ++++++++++++----------
 .../processor/internals/StreamThreadTest.java      | 16 +++++++
 2 files changed, 43 insertions(+), 24 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1ea75d17dc0..2d433eb3c82 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -770,7 +770,9 @@ public class StreamThread extends Thread {
             return;
         }
 
-        initializeAndRestorePhase();
+        if (!stateUpdaterEnabled) {
+            initializeAndRestorePhase();
+        }
 
         // TODO: we should record the restore latency and its relative time 
spent ratio after
         //       we figure out how to move this method out of the stream thread
@@ -792,6 +794,11 @@ public class StreamThread extends Thread {
              *  6. Otherwise, increment N.
              */
             do {
+
+                if (stateUpdaterEnabled) {
+                    checkStateUpdater();
+                }
+
                 log.debug("Processing tasks with {} iterations.", 
numIterations);
                 final int processed = taskManager.process(numIterations, time);
                 final long processLatency = advanceNowAndComputeLatency();
@@ -880,36 +887,32 @@ public class StreamThread extends Thread {
     private void initializeAndRestorePhase() {
         final java.util.function.Consumer<Set<TopicPartition>> offsetResetter 
= partitions -> resetOffsets(partitions, null);
         final State stateSnapshot = state;
-        if (stateUpdaterEnabled) {
-            checkStateUpdater();
-        } else {
-            // only try to initialize the assigned tasks
-            // if the state is still in PARTITION_ASSIGNED after the poll call
-            if (stateSnapshot == State.PARTITIONS_ASSIGNED
-                || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+        // only try to initialize the assigned tasks
+        // if the state is still in PARTITION_ASSIGNED after the poll call
+        if (stateSnapshot == State.PARTITIONS_ASSIGNED
+            || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
 
-                log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
+            log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
 
-                if (taskManager.tryToCompleteRestoration(now, offsetResetter)) 
{
-                    log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
-                        taskManager.allTasks().keySet());
-                    setState(State.RUNNING);
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Initialization call done. State is {}", state);
-                }
+            if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
+                log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
+                    taskManager.allTasks().keySet());
+                setState(State.RUNNING);
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("Idempotently invoking restoration logic in state 
{}", state);
+                log.debug("Initialization call done. State is {}", state);
             }
-            // we can always let changelog reader try restoring in order to 
initialize the changelogs;
-            // if there's no active restoring or standby updating it would not 
try to fetch any data
-            // After KAFKA-13873, we only restore the not paused tasks.
-            changelogReader.restore(taskManager.notPausedTasks());
-            log.debug("Idempotent restore call done. Thread state has not 
changed.");
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Idempotently invoking restoration logic in state {}", 
state);
+        }
+        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
+        // if there's no active restoring or standby updating it would not try 
to fetch any data
+        // After KAFKA-13873, we only restore the not paused tasks.
+        changelogReader.restore(taskManager.notPausedTasks());
+        log.debug("Idempotent restore call done. Thread state has not 
changed.");
     }
 
     private void checkStateUpdater() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 7ecac4d7ecf..76574cccb70 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -149,6 +149,7 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -3005,6 +3006,21 @@ public class StreamThreadTest {
         Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
     }
 
+    @Test
+    public void shouldCheckStateUpdaterInBetweenProcessCalls() {
+        final Properties streamsConfigProps = 
StreamsTestUtils.getStreamsConfig();
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true);
+        final StreamThread streamThread = setUpThread(streamsConfigProps);
+        final TaskManager taskManager = streamThread.taskManager();
+        streamThread.setState(State.STARTING);
+        // non-zero return of process will cause a second call to process
+        when(taskManager.process(Mockito.anyInt(), 
Mockito.any())).thenReturn(1).thenReturn(0);
+
+        streamThread.runOnce();
+
+        Mockito.verify(taskManager, 
times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any());
+    }
+
     @Test
     public void 
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
         final Properties streamsConfigProps = 
StreamsTestUtils.getStreamsConfig();

Reply via email to