This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6263197a623 KAFKA-15326: [9/N] Start and stop executors and
cornercases (#14281)
6263197a623 is described below
commit 6263197a62389c9dce0210cd9d65a6e601345edc
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Oct 2 15:41:21 2023 +0200
KAFKA-15326: [9/N] Start and stop executors and cornercases (#14281)
* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the
processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and
requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught
exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager
Reviewers: Bruno Cadonna <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../streams/processor/internals/StreamTask.java | 9 +-
.../internals/tasks/DefaultTaskExecutor.java | 90 +++++++++++++-----
.../internals/tasks/DefaultTaskManager.java | 66 +++++++++++---
.../processor/internals/tasks/TaskExecutor.java | 21 ++++-
.../processor/internals/tasks/TaskManager.java | 27 +++++-
.../processor/internals/StreamTaskTest.java | 13 +++
.../internals/tasks/DefaultTaskExecutorTest.java | 50 ++++++++--
.../internals/tasks/DefaultTaskManagerTest.java | 101 +++++++++++++++++----
9 files changed, 313 insertions(+), 66 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f01281acb0e..29ef2ff0ad8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -235,7 +235,7 @@
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="JavaNCSS"
-
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
+
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 022ddbcf4c4..11d658b3d8f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -394,6 +394,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
timeCurrentIdlingStarted = Optional.empty();
}
+
+ public void flush() {
+ stateMgr.flushCache();
+ recordCollector.flush();
+ }
+
/**
* @throws StreamsException fatal error that should cause the thread to die
* @throws TaskMigratedException recoverable error that would cause the
task to be removed
@@ -414,8 +420,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// cached records to be processed and hence generate more
records to be sent out
//
// TODO: this should be removed after we decouple caching
with emitting
- stateMgr.flushCache();
- recordCollector.flush();
+ flush();
hasPendingTxCommit = eosEnabled;
log.debug("Prepared {} task for committing", state());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
index 01e7b6fa902..09445d16076 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
@@ -41,8 +41,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
private class TaskExecutorThread extends Thread {
- private final AtomicBoolean isRunning = new AtomicBoolean(true);
- private final AtomicReference<KafkaFutureImpl<StreamTask>>
pauseRequested = new AtomicReference<>(null);
+ private final AtomicBoolean shutdownRequested = new
AtomicBoolean(false);
+ private final AtomicReference<KafkaFutureImpl<StreamTask>>
taskReleaseRequested = new AtomicReference<>(null);
private final Logger log;
@@ -57,26 +57,52 @@ public class DefaultTaskExecutor implements TaskExecutor {
public void run() {
log.info("Task executor thread started");
try {
- while (isRunning.get()) {
- runOnce(time.milliseconds());
+ while (!shutdownRequested.get()) {
+ try {
+ runOnce(time.milliseconds());
+ } catch (final StreamsException e) {
+ handleException(e);
+ } catch (final Exception e) {
+ handleException(new StreamsException(e));
+ }
}
- } catch (final StreamsException e) {
- handleException(e);
- } catch (final Exception e) {
- handleException(new StreamsException(e));
} finally {
if (currentTask != null) {
+ log.debug("Releasing task {} due to shutdown.",
currentTask.id());
unassignCurrentTask();
}
shutdownGate.countDown();
+
+ final KafkaFutureImpl<StreamTask> taskReleaseFuture;
+ if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null))
!= null) {
+ log.debug("Asked to return current task, but shutting
down.");
+ taskReleaseFuture.complete(null);
+ }
log.info("Task executor thread shutdown");
}
}
+ private void handleTaskReleaseRequested() {
+ final KafkaFutureImpl<StreamTask> taskReleaseFuture;
+ if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null)) !=
null) {
+ if (currentTask != null) {
+ log.debug("Releasing task {} upon request.",
currentTask.id());
+ final StreamTask unassignedTask = unassignCurrentTask();
+ taskReleaseFuture.complete(unassignedTask);
+ } else {
+ log.debug("Asked to return current task, but returned
current task already.");
+ taskReleaseFuture.complete(null);
+ }
+ }
+ }
+
private void handleException(final StreamsException e) {
if (currentTask != null) {
taskManager.setUncaughtException(e, currentTask.id());
+
+ log.debug("Releasing task {} due to uncaught exception.",
currentTask.id());
+ unassignCurrentTask();
} else {
// If we do not currently have a task assigned and still get
an error, this is fatal for the executor thread
throw e;
@@ -84,11 +110,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
private void runOnce(final long nowMs) {
- final KafkaFutureImpl<StreamTask> pauseFuture;
- if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
- final StreamTask unassignedTask = unassignCurrentTask();
- pauseFuture.complete(unassignedTask);
- }
+ handleTaskReleaseRequested();
if (currentTask == null) {
currentTask =
taskManager.assignNextTask(DefaultTaskExecutor.this);
@@ -105,6 +127,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
if (taskExecutionMetadata.canProcessTask(currentTask, nowMs)
&& currentTask.isProcessable(nowMs)) {
if (processTask(currentTask, nowMs, time)) {
+ log.trace("processed a record for {}",
currentTask.id());
progressed = true;
}
}
@@ -121,6 +144,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
if (!progressed) {
+ log.debug("Releasing task {} because we are not making
progress.", currentTask.id());
unassignCurrentTask();
}
}
@@ -131,10 +155,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
try {
processed = task.process(now);
if (processed) {
+ log.trace("Successfully processed task {}", task.id());
task.clearTaskTimeout();
// TODO: enable regardless of whether using named
topologies
if (taskExecutionMetadata.hasNamedTopologies() &&
taskExecutionMetadata.processingMode() != EXACTLY_ONCE_V2) {
- log.trace("Successfully processed task {}", task.id());
taskExecutionMetadata.addToSuccessfullyProcessed(task);
}
}
@@ -168,9 +192,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
if (currentTask == null)
throw new IllegalStateException("Does not own any task while
being ask to unassign from task manager");
- // flush the task before giving it back to task manager
- // TODO: we can add a separate function in StreamTask to just
flush and not return offsets
- currentTask.prepareCommit();
+ // flush the task before giving it back to task manager, if we are
not handing it back because of an error.
+ if (!taskManager.hasUncaughtException(currentTask.id())) {
+ currentTask.flush();
+ }
taskManager.unassignTask(currentTask, DefaultTaskExecutor.this);
final StreamTask retTask = currentTask;
@@ -183,6 +208,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
private final String name;
private final TaskManager taskManager;
private final TaskExecutionMetadata taskExecutionMetadata;
+ private final Logger log;
private StreamTask currentTask = null;
private TaskExecutorThread taskExecutorThread = null;
@@ -196,6 +222,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
this.name = name;
this.taskManager = taskManager;
this.taskExecutionMetadata = taskExecutionMetadata;
+ final LogContext logContext = new LogContext(name);
+ this.log = logContext.logger(DefaultTaskExecutor.class);
}
@Override
@@ -213,10 +241,20 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
@Override
- public void shutdown(final Duration timeout) {
+ public boolean isRunning() {
+ return taskExecutorThread != null && taskExecutorThread.isAlive() &&
shutdownGate.getCount() != 0;
+ }
+
+ @Override
+ public void requestShutdown() {
+ if (taskExecutorThread != null) {
+ taskExecutorThread.shutdownRequested.set(true);
+ }
+ }
+
+ @Override
+ public void awaitShutdown(final Duration timeout) {
if (taskExecutorThread != null) {
- taskExecutorThread.isRunning.set(false);
- taskExecutorThread.interrupt();
try {
if (!shutdownGate.await(timeout.toMillis(),
TimeUnit.MILLISECONDS)) {
throw new StreamsException("State updater thread did not
shutdown within the timeout");
@@ -237,8 +275,18 @@ public class DefaultTaskExecutor implements TaskExecutor {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
if (taskExecutorThread != null) {
- taskExecutorThread.pauseRequested.set(future);
+ log.debug("Asking {} to hand back task",
taskExecutorThread.getName());
+ if (!taskExecutorThread.taskReleaseRequested.compareAndSet(null,
future)) {
+ throw new IllegalStateException("There was already a task
release request registered");
+ }
+ if (shutdownGate.getCount() == 0) {
+ log.debug("Completing future, because task executor was just
shut down");
+ future.complete(null);
+ } else {
+ taskManager.signalTaskExecutors();
+ }
} else {
+ log.debug("Tried to unassign but no thread is running");
future.complete(null);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 4f2d22d4711..56749b4f5f0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
+import java.time.Duration;
import java.util.concurrent.locks.Condition;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
@@ -65,7 +65,7 @@ public class DefaultTaskManager implements TaskManager {
private final List<TaskExecutor> taskExecutors;
- static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
+ public static class DefaultTaskExecutorCreator implements
TaskExecutorCreator {
@Override
public TaskExecutor create(final TaskManager taskManager, final String
name, final Time time, final TaskExecutionMetadata taskExecutionMetadata) {
return new DefaultTaskExecutor(taskManager, name, time,
taskExecutionMetadata);
@@ -76,9 +76,9 @@ public class DefaultTaskManager implements TaskManager {
public DefaultTaskManager(final Time time,
final String clientId,
final TasksRegistry tasks,
- final StreamsConfig config,
final TaskExecutorCreator executorCreator,
- final TaskExecutionMetadata taskExecutionMetadata
+ final TaskExecutionMetadata
taskExecutionMetadata,
+ final int numExecutors
) {
final String logPrefix = String.format("%s ", clientId);
final LogContext logContext = new LogContext(logPrefix);
@@ -87,7 +87,6 @@ public class DefaultTaskManager implements TaskManager {
this.tasks = tasks;
this.taskExecutionMetadata = taskExecutionMetadata;
- final int numExecutors =
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
this.taskExecutors = new ArrayList<>(numExecutors);
for (int i = 1; i <= numExecutors; i++) {
final String name = clientId + "-TaskExecutor-" + i;
@@ -106,7 +105,8 @@ public class DefaultTaskManager implements TaskManager {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
- canProgress((StreamTask) task, time.milliseconds())
+ canProgress((StreamTask) task, time.milliseconds()) &&
+ !hasUncaughtException(task.id())
) {
assignedTasks.put(task.id(), executor);
@@ -117,6 +117,8 @@ public class DefaultTaskManager implements TaskManager {
}
}
+ log.debug("Found no assignable task for executor {}",
executor.name());
+
return null;
});
}
@@ -127,7 +129,8 @@ public class DefaultTaskManager implements TaskManager {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
- canProgress((StreamTask) task, time.milliseconds())
+ canProgress((StreamTask) task, time.milliseconds()) &&
+ !hasUncaughtException(task.id())
) {
log.debug("Await unblocked: returning early from await
since a processable task {} was found", task.id());
return false;
@@ -151,7 +154,7 @@ public class DefaultTaskManager implements TaskManager {
}
}
- public void signalProcessableTasks() {
+ public void signalTaskExecutors() {
log.debug("Waking up task executors");
executeWithTasksLocked(tasksCondition::signalAll);
}
@@ -177,10 +180,16 @@ public class DefaultTaskManager implements TaskManager {
@Override
public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) {
+ final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+ if (taskIds.isEmpty()) {
+ result.complete(null);
+ return result;
+ }
+
return returnWithTasksLocked(() -> {
lockedTasks.addAll(taskIds);
- final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
final Set<TaskId> remainingTaskIds = new
ConcurrentSkipListSet<>(taskIds);
for (final TaskId taskId : taskIds) {
@@ -195,12 +204,18 @@ public class DefaultTaskManager implements TaskManager {
}
if (assignedTasks.containsKey(taskId)) {
- final KafkaFuture<StreamTask> future =
assignedTasks.get(taskId).unassign();
+ final TaskExecutor executor = assignedTasks.get(taskId);
+ log.debug("Requesting release of task {} from {}", taskId,
executor.name());
+ final KafkaFuture<StreamTask> future = executor.unassign();
future.whenComplete((streamTask, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
- remainingTaskIds.remove(streamTask.id());
+ assert !assignedTasks.containsKey(taskId);
+ // It can happen that the executor handed back the
task before we asked it to
+ // in which case `streamTask` will be null here.
+ assert streamTask == null || streamTask.id() ==
taskId;
+ remainingTaskIds.remove(taskId);
if (remainingTaskIds.isEmpty()) {
result.complete(null);
}
@@ -227,6 +242,11 @@ public class DefaultTaskManager implements TaskManager {
@Override
public void unlockTasks(final Set<TaskId> taskIds) {
+
+ if (taskIds.isEmpty()) {
+ return;
+ }
+
executeWithTasksLocked(() -> {
lockedTasks.removeAll(taskIds);
log.debug("Waking up task executors");
@@ -306,11 +326,17 @@ public class DefaultTaskManager implements TaskManager {
return result;
});
- log.debug("Drained {} uncaught exceptions", returnValue.size());
+ if (!returnValue.isEmpty()) {
+ log.debug("Drained {} uncaught exceptions", returnValue.size());
+ }
return returnValue;
}
+ public boolean hasUncaughtException(final TaskId taskId) {
+ return returnWithTasksLocked(() ->
uncaughtExceptions.containsKey(taskId));
+ }
+
private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
@@ -334,5 +360,21 @@ public class DefaultTaskManager implements TaskManager {
taskExecutionMetadata.canProcessTask(task, nowMs) &&
task.isProcessable(nowMs) ||
taskExecutionMetadata.canPunctuateTask(task) &&
(task.canPunctuateStreamTime() || task.canPunctuateSystemTime());
}
+
+ public void startTaskExecutors() {
+ for (final TaskExecutor t: taskExecutors) {
+ t.start();
+ }
+ }
+
+ public void shutdown(final Duration duration) {
+ for (final TaskExecutor t: taskExecutors) {
+ t.requestShutdown();
+ }
+ signalTaskExecutors();
+ for (final TaskExecutor t: taskExecutors) {
+ t.awaitShutdown(duration);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
index 04538744a29..aada2fa2341 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
@@ -31,18 +31,35 @@ public interface TaskExecutor {
/**
* Starts the task executor.
+ * Idempotent operation - will have no effect if thread is already started.
*/
void start();
+ /**
+ * Returns true if the task executor thread is running.
+ */
+ boolean isRunning();
+
+ /**
+ * Asks the task executor to shut down.
+ * Idempotent operation - will have no effect if thread was already asked
to shut down
+ *
+ * @throws
+ * org.apache.kafka.streams.errors.StreamsException if the state
updater thread cannot shutdown within the timeout
+ */
+ void requestShutdown();
+
/**
* Shuts down the task processor updater.
+ * Idempotent operation - will have no effect if thread is already shut
down.
+ * Must call `requestShutdown` first.
*
* @param timeout duration how long to wait until the state updater is
shut down
*
* @throws
- * org.apache.kafka.streams.errors.StreamsException if the state
updater thread cannot shutdown within the timeout
+ * org.apache.kafka.streams.errors.StreamsException if the state
updater thread does not shutdown within the timeout
*/
- void shutdown(final Duration timeout);
+ void awaitShutdown(final Duration timeout);
/**
* Get the current assigned processing task. The task returned is
read-only and cannot be modified.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
index 63e9f2baecf..6a52f71dd62 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
+import java.time.Duration;
import java.util.Map;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.errors.StreamsException;
@@ -59,7 +60,7 @@ public interface TaskManager {
KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds);
/**
- * Lock all of the managed active tasks from the task manager. Similar to
{@link #lockTasks(Set)}.
+ * Lock all the managed active tasks from the task manager. Similar to
{@link #lockTasks(Set)}.
*
* This method does not block, instead a future is returned.
*/
@@ -71,7 +72,7 @@ public interface TaskManager {
void unlockTasks(final Set<TaskId> taskIds);
/**
- * Unlock all of the managed active tasks from the task manager. Similar
to {@link #unlockTasks(Set)}.
+ * Unlock all the managed active tasks from the task manager. Similar to
{@link #unlockTasks(Set)}.
*/
void unlockAllTasks();
@@ -119,14 +120,34 @@ public interface TaskManager {
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();
+ /**
+ * Can be used to check if a specific task has an uncaught exception.
+ *
+ * @param taskId the task ID to check for
+ */
+ boolean hasUncaughtException(final TaskId taskId);
+
/**
* Signals that at least one task has become processable, e.g. because it
was resumed or new records may be available.
*/
- void signalProcessableTasks();
+ void signalTaskExecutors();
/**
* Blocks until unassigned processable tasks may be available.
*/
void awaitProcessableTasks() throws InterruptedException;
+ /**
+ * Starts all threads associated with this task manager.
+ */
+ void startTaskExecutors();
+
+ /**
+ * Shuts down all threads associated with this task manager.
+ * All tasks will be unlocked and unassigned by the end of this.
+ *
+ * @param duration Time to wait for each thread to shut down.
+ */
+ void shutdown(final Duration duration);
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7ed11a61ece..e295d29e419 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2323,6 +2323,19 @@ public class StreamTaskTest {
assertThat(task.state(), is(Task.State.CLOSED));
}
+ @Test
+ public void shouldFlushStateManagerAndRecordCollector() {
+ stateManager.flush();
+ EasyMock.expectLastCall().once();
+ recordCollector.flush();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(stateManager, recordCollector);
+
+ task = createStatefulTask(createConfig("100"), false);
+
+ task.flush();
+ }
+
@Test
public void shouldClearCommitStatusesInCloseDirty() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"),
StreamsConfig.METRICS_LATEST);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index d24f699c3da..a697aabf3bc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.junit.jupiter.api.AfterEach;
@@ -31,6 +32,7 @@ import java.time.Duration;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -43,6 +45,8 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
public class DefaultTaskExecutorTest {
private final static long VERIFICATION_TIMEOUT = 15000;
@@ -60,29 +64,51 @@ public class DefaultTaskExecutorTest {
when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null);
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
when(task.isProcessable(anyLong())).thenReturn(true);
+ when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.process(anyLong())).thenReturn(true);
when(task.prepareCommit()).thenReturn(Collections.emptyMap());
}
@AfterEach
public void tearDown() {
- taskExecutor.shutdown(Duration.ofMinutes(1));
+ taskExecutor.requestShutdown();
+ taskExecutor.awaitShutdown(Duration.ofMinutes(1));
}
@Test
public void shouldShutdownTaskExecutor() {
assertNull(taskExecutor.currentTask(), "Have task assigned before
startup");
+ assertFalse(taskExecutor.isRunning());
taskExecutor.start();
+ assertTrue(taskExecutor.isRunning());
verify(taskManager,
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
- taskExecutor.shutdown(Duration.ofMinutes(1));
+ taskExecutor.requestShutdown();
+ taskExecutor.awaitShutdown(Duration.ofMinutes(1));
- verify(task).prepareCommit();
+ verify(task).flush();
verify(taskManager).unassignTask(task, taskExecutor);
assertNull(taskExecutor.currentTask(), "Have task assigned after
shutdown");
+ assertFalse(taskExecutor.isRunning());
+ }
+
+ @Test
+ public void shouldClearTaskReleaseFutureOnShutdown() throws
InterruptedException {
+ assertNull(taskExecutor.currentTask(), "Have task assigned before
startup");
+
+ taskExecutor.start();
+
+ verify(taskManager,
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
+
+ final KafkaFuture<StreamTask> future = taskExecutor.unassign();
+ taskExecutor.requestShutdown();
+ taskExecutor.awaitShutdown(Duration.ofMinutes(1));
+
+ waitForCondition(future::isDone, "Await for unassign future to
complete");
+ assertNull(taskExecutor.currentTask(), "Have task assigned after
shutdown");
}
@Test
@@ -104,7 +130,7 @@ public class DefaultTaskExecutorTest {
taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task,
taskExecutor);
- verify(task).prepareCommit();
+ verify(task).flush();
assertNull(taskExecutor.currentTask());
}
@@ -209,7 +235,7 @@ public class DefaultTaskExecutorTest {
final KafkaFuture<StreamTask> future = taskExecutor.unassign();
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task,
taskExecutor);
- verify(task).prepareCommit();
+ verify(task).flush();
assertNull(taskExecutor.currentTask());
assertTrue(future.isDone(), "Unassign is not completed");
@@ -223,10 +249,22 @@ public class DefaultTaskExecutorTest {
taskExecutor.start();
- verify(taskManager,
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
verify(taskManager,
timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task,
taskExecutor);
assertNull(taskExecutor.currentTask());
+ assertTrue(taskExecutor.isRunning(), "should not shut down upon
exception");
+ }
+
+ @Test
+ public void shouldNotFlushOnException() {
+ final StreamsException exception = mock(StreamsException.class);
+ when(task.process(anyLong())).thenThrow(exception);
+ when(taskManager.hasUncaughtException(task.id())).thenReturn(true);
+
+ taskExecutor.start();
+
+ verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task,
taskExecutor);
+ verify(task, never()).flush();
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 5b943334ff5..e0b8a6c1287 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -23,9 +24,9 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
@@ -34,11 +35,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -65,17 +61,8 @@ public class DefaultTaskManagerTest {
private final StreamsException exception = mock(StreamsException.class);
private final TaskExecutionMetadata taskExecutionMetadata =
mock(TaskExecutionMetadata.class);
- private final StreamsConfig config = new StreamsConfig(configProps());
- private final TaskManager taskManager = new DefaultTaskManager(time,
"TaskManager", tasks, config,
- (taskManager, name, time, taskExecutionMetadata) -> taskExecutor,
taskExecutionMetadata);
-
- private Properties configProps() {
- return mkObjectProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
- mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2)
- ));
- }
+ private final TaskManager taskManager = new DefaultTaskManager(time,
"TaskManager", tasks,
+ (taskManager, name, time, taskExecutionMetadata) -> taskExecutor,
taskExecutionMetadata, 1);
@BeforeEach
public void setUp() {
@@ -84,6 +71,22 @@ public class DefaultTaskManagerTest {
when(tasks.task(taskId)).thenReturn(task);
}
+ @Test
+ public void shouldShutdownTaskExecutors() {
+ final Duration duration = mock(Duration.class);
+ taskManager.shutdown(duration);
+
+ verify(taskExecutor).requestShutdown();
+ verify(taskExecutor).awaitShutdown(duration);
+ }
+
+ @Test
+ public void shouldStartTaskExecutors() {
+ taskManager.startTaskExecutors();
+
+ verify(taskExecutor).start();
+ }
+
@Test
public void shouldAddTask() {
taskManager.add(Collections.singleton(task));
@@ -119,7 +122,7 @@ public class DefaultTaskManagerTest {
public void shutdown() {
shutdownRequested.set(true);
- taskManager.signalProcessableTasks();
+ taskManager.signalTaskExecutors();
}
}
@@ -155,7 +158,7 @@ public class DefaultTaskManagerTest {
awaitingThread.start();
verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
- taskManager.signalProcessableTasks();
+ taskManager.signalTaskExecutors();
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT,
TimeUnit.MILLISECONDS));
@@ -233,6 +236,18 @@ public class DefaultTaskManagerTest {
assertNull(taskManager.assignNextTask(taskExecutor));
}
+ @Test
+ public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
+ taskManager.add(Collections.singleton(task));
+ when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+ ensureTaskMakesProgress();
+ taskManager.assignNextTask(taskExecutor);
+ taskManager.setUncaughtException(new StreamsException("Exception"),
taskId);
+ taskManager.unassignTask(task, taskExecutor);
+
+ assertNull(taskManager.assignNextTask(taskExecutor));
+ }
+
@Test
public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
taskManager.add(Collections.singleton(task));
@@ -314,6 +329,7 @@ public class DefaultTaskManagerTest {
public void shouldNotAssignLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+ when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
@@ -322,10 +338,48 @@ public class DefaultTaskManagerTest {
assertNull(taskManager.assignNextTask(taskExecutor));
}
+ @Test
+ public void shouldLockAnEmptySetOfTasks() {
+ taskManager.add(Collections.singleton(task));
+ when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+ when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
+ when(tasks.task(task.id())).thenReturn(task);
+ when(tasks.contains(task.id())).thenReturn(true);
+
+ assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
+
+ assertEquals(task, taskManager.assignNextTask(taskExecutor));
+ }
+
+
+ @Test
+ public void shouldLockATaskThatWasVoluntarilyReleased() {
+ final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
+
+ taskManager.add(Collections.singleton(task));
+ when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+ when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
+ when(tasks.task(task.id())).thenReturn(task);
+ when(tasks.contains(task.id())).thenReturn(true);
+ when(taskExecutor.unassign()).thenReturn(future);
+
+ assertEquals(task, taskManager.assignNextTask(taskExecutor));
+
+ final KafkaFuture<Void> lockingFuture =
taskManager.lockTasks(Collections.singleton(task.id()));
+ assertFalse(lockingFuture.isDone());
+
+ taskManager.unassignTask(task, taskExecutor);
+ future.complete(null);
+
+ assertTrue(lockingFuture.isDone());
+ assertNull(taskManager.assignNextTask(taskExecutor));
+ }
+
@Test
public void shouldNotAssignAnyLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+ when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
@@ -379,12 +433,21 @@ public class DefaultTaskManagerTest {
assertEquals(task, taskManager.assignNextTask(taskExecutor));
+ when(taskExecutor.currentTask()).thenReturn(new ReadOnlyTask(task));
final KafkaFuture<Void> lockFuture = taskManager.lockAllTasks();
assertFalse(lockFuture.isDone());
verify(taskExecutor).unassign();
+ taskManager.unassignTask(task, taskExecutor);
future.complete(task);
+
assertTrue(lockFuture.isDone());
}
+
+ private void ensureTaskMakesProgress() {
+
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
+ when(task.canPunctuateStreamTime()).thenReturn(true);
+ }
+
}