This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22606a0a4d4 KAFKA-14530: Check state updater more often (#13017)
22606a0a4d4 is described below
commit 22606a0a4d45dfb37b72a40de3728778ac4ffe84
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 12 12:40:07 2023 +0100
KAFKA-14530: Check state updater more often (#13017)
In the new state restoration code, the state updater needs to be checked
regularly
by the main thread to transfer ownership of tasks back to the main thread
once the
state of the task is restored. The more often we check this, the faster we
can
start processing the tasks.
Currently, we only check the state updater once in every loop iteration of
the state
updater. And while we couldn't observe this to be strictly not often
enough, we can
increase the number of checks easily by moving the check inside the inner
processing
loop. This would mean that once we have iterated over `numIterations`
records, we can
already start processing tasks that have finished restoration in the
meantime.
Reviewer: Bruno Cadonna <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 51 ++++++++++++----------
.../processor/internals/StreamThreadTest.java | 16 +++++++
2 files changed, 43 insertions(+), 24 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1ea75d17dc0..2d433eb3c82 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -770,7 +770,9 @@ public class StreamThread extends Thread {
return;
}
- initializeAndRestorePhase();
+ if (!stateUpdaterEnabled) {
+ initializeAndRestorePhase();
+ }
// TODO: we should record the restore latency and its relative time
spent ratio after
// we figure out how to move this method out of the stream thread
@@ -792,6 +794,11 @@ public class StreamThread extends Thread {
* 6. Otherwise, increment N.
*/
do {
+
+ if (stateUpdaterEnabled) {
+ checkStateUpdater();
+ }
+
log.debug("Processing tasks with {} iterations.",
numIterations);
final int processed = taskManager.process(numIterations, time);
final long processLatency = advanceNowAndComputeLatency();
@@ -880,36 +887,32 @@ public class StreamThread extends Thread {
private void initializeAndRestorePhase() {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter
= partitions -> resetOffsets(partitions, null);
final State stateSnapshot = state;
- if (stateUpdaterEnabled) {
- checkStateUpdater();
- } else {
- // only try to initialize the assigned tasks
- // if the state is still in PARTITION_ASSIGNED after the poll call
- if (stateSnapshot == State.PARTITIONS_ASSIGNED
- || stateSnapshot == State.RUNNING &&
taskManager.needsInitializationOrRestoration()) {
+ // only try to initialize the assigned tasks
+ // if the state is still in PARTITION_ASSIGNED after the poll call
+ if (stateSnapshot == State.PARTITIONS_ASSIGNED
+ || stateSnapshot == State.RUNNING &&
taskManager.needsInitializationOrRestoration()) {
- log.debug("State is {}; initializing tasks if necessary",
stateSnapshot);
+ log.debug("State is {}; initializing tasks if necessary",
stateSnapshot);
- if (taskManager.tryToCompleteRestoration(now, offsetResetter))
{
- log.info("Restoration took {} ms for all tasks {}",
time.milliseconds() - lastPartitionAssignedMs,
- taskManager.allTasks().keySet());
- setState(State.RUNNING);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Initialization call done. State is {}", state);
- }
+ if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
+ log.info("Restoration took {} ms for all tasks {}",
time.milliseconds() - lastPartitionAssignedMs,
+ taskManager.allTasks().keySet());
+ setState(State.RUNNING);
}
if (log.isDebugEnabled()) {
- log.debug("Idempotently invoking restoration logic in state
{}", state);
+ log.debug("Initialization call done. State is {}", state);
}
- // we can always let changelog reader try restoring in order to
initialize the changelogs;
- // if there's no active restoring or standby updating it would not
try to fetch any data
- // After KAFKA-13873, we only restore the not paused tasks.
- changelogReader.restore(taskManager.notPausedTasks());
- log.debug("Idempotent restore call done. Thread state has not
changed.");
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Idempotently invoking restoration logic in state {}",
state);
+ }
+ // we can always let changelog reader try restoring in order to
initialize the changelogs;
+ // if there's no active restoring or standby updating it would not try
to fetch any data
+ // After KAFKA-13873, we only restore the not paused tasks.
+ changelogReader.restore(taskManager.notPausedTasks());
+ log.debug("Idempotent restore call done. Thread state has not
changed.");
}
private void checkStateUpdater() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 7ecac4d7ecf..76574cccb70 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -149,6 +149,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -3005,6 +3006,21 @@ public class StreamThreadTest {
Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
}
+ @Test
+ public void shouldCheckStateUpdaterInBetweenProcessCalls() {
+ final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
+ streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true);
+ final StreamThread streamThread = setUpThread(streamsConfigProps);
+ final TaskManager taskManager = streamThread.taskManager();
+ streamThread.setState(State.STARTING);
+ // non-zero return of process will cause a second call to process
+ when(taskManager.process(Mockito.anyInt(),
Mockito.any())).thenReturn(1).thenReturn(0);
+
+ streamThread.runOnce();
+
+ Mockito.verify(taskManager,
times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any());
+ }
+
@Test
public void
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();