This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6263197a623 KAFKA-15326: [9/N] Start and stop executors and 
cornercases (#14281)
6263197a623 is described below

commit 6263197a62389c9dce0210cd9d65a6e601345edc
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Oct 2 15:41:21 2023 +0200

    KAFKA-15326: [9/N] Start and stop executors and cornercases (#14281)
    
    * Implements start and stop of task executors
    * Introduce flush operation to keep consumer operations out of the 
processing threads
    * Fixes corner case: handle requested unassignment during shutdown
    * Fixes corner case: handle race between voluntary unassignment and 
requested unassigment
    * Fixes corner case: task locking future completes for the empty set
    * Fixes corner case: we should not reassign a task with an uncaught 
exception to a task executor
    * Improved logging
    * Number of threads controlled from outside, of the TaskManager
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../streams/processor/internals/StreamTask.java    |   9 +-
 .../internals/tasks/DefaultTaskExecutor.java       |  90 +++++++++++++-----
 .../internals/tasks/DefaultTaskManager.java        |  66 +++++++++++---
 .../processor/internals/tasks/TaskExecutor.java    |  21 ++++-
 .../processor/internals/tasks/TaskManager.java     |  27 +++++-
 .../processor/internals/StreamTaskTest.java        |  13 +++
 .../internals/tasks/DefaultTaskExecutorTest.java   |  50 ++++++++--
 .../internals/tasks/DefaultTaskManagerTest.java    | 101 +++++++++++++++++----
 9 files changed, 313 insertions(+), 66 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f01281acb0e..29ef2ff0ad8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -235,7 +235,7 @@
               
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
 
     <suppress checks="JavaNCSS"
-              
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
+              
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
 
     <suppress checks="NPathComplexity"
               
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 022ddbcf4c4..11d658b3d8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -394,6 +394,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         timeCurrentIdlingStarted = Optional.empty();
     }
 
+
+    public void flush() {
+        stateMgr.flushCache();
+        recordCollector.flush();
+    }
+
     /**
      * @throws StreamsException fatal error that should cause the thread to die
      * @throws TaskMigratedException recoverable error that would cause the 
task to be removed
@@ -414,8 +420,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                     // cached records to be processed and hence generate more 
records to be sent out
                     //
                     // TODO: this should be removed after we decouple caching 
with emitting
-                    stateMgr.flushCache();
-                    recordCollector.flush();
+                    flush();
                     hasPendingTxCommit = eosEnabled;
 
                     log.debug("Prepared {} task for committing", state());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
index 01e7b6fa902..09445d16076 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
@@ -41,8 +41,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
 
     private class TaskExecutorThread extends Thread {
 
-        private final AtomicBoolean isRunning = new AtomicBoolean(true);
-        private final AtomicReference<KafkaFutureImpl<StreamTask>> 
pauseRequested = new AtomicReference<>(null);
+        private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+        private final AtomicReference<KafkaFutureImpl<StreamTask>> 
taskReleaseRequested = new AtomicReference<>(null);
 
         private final Logger log;
 
@@ -57,26 +57,52 @@ public class DefaultTaskExecutor implements TaskExecutor {
         public void run() {
             log.info("Task executor thread started");
             try {
-                while (isRunning.get()) {
-                    runOnce(time.milliseconds());
+                while (!shutdownRequested.get()) {
+                    try {
+                        runOnce(time.milliseconds());
+                    } catch (final StreamsException e) {
+                        handleException(e);
+                    } catch (final Exception e) {
+                        handleException(new StreamsException(e));
+                    }
                 }
-            } catch (final StreamsException e) {
-                handleException(e);
-            } catch (final Exception e) {
-                handleException(new StreamsException(e));
             } finally {
                 if (currentTask != null) {
+                    log.debug("Releasing task {} due to shutdown.", 
currentTask.id());
                     unassignCurrentTask();
                 }
 
                 shutdownGate.countDown();
+
+                final KafkaFutureImpl<StreamTask> taskReleaseFuture;
+                if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null)) 
!= null) {
+                    log.debug("Asked to return current task, but shutting 
down.");
+                    taskReleaseFuture.complete(null);
+                }
                 log.info("Task executor thread shutdown");
             }
         }
 
+        private void handleTaskReleaseRequested() {
+            final KafkaFutureImpl<StreamTask> taskReleaseFuture;
+            if ((taskReleaseFuture = taskReleaseRequested.getAndSet(null)) != 
null) {
+                if (currentTask != null) {
+                    log.debug("Releasing task {} upon request.", 
currentTask.id());
+                    final StreamTask unassignedTask = unassignCurrentTask();
+                    taskReleaseFuture.complete(unassignedTask);
+                } else {
+                    log.debug("Asked to return current task, but returned 
current task already.");
+                    taskReleaseFuture.complete(null);
+                }
+            }
+        }
+
         private void handleException(final StreamsException e) {
             if (currentTask != null) {
                 taskManager.setUncaughtException(e, currentTask.id());
+
+                log.debug("Releasing task {} due to uncaught exception.", 
currentTask.id());
+                unassignCurrentTask();
             } else {
                 // If we do not currently have a task assigned and still get 
an error, this is fatal for the executor thread
                 throw e;
@@ -84,11 +110,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
         }
 
         private void runOnce(final long nowMs) {
-            final KafkaFutureImpl<StreamTask> pauseFuture;
-            if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
-                final StreamTask unassignedTask = unassignCurrentTask();
-                pauseFuture.complete(unassignedTask);
-            }
+            handleTaskReleaseRequested();
 
             if (currentTask == null) {
                 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
@@ -105,6 +127,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
 
                 if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) 
&& currentTask.isProcessable(nowMs)) {
                     if (processTask(currentTask, nowMs, time)) {
+                        log.trace("processed a record for {}", 
currentTask.id());
                         progressed = true;
                     }
                 }
@@ -121,6 +144,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
                 }
 
                 if (!progressed) {
+                    log.debug("Releasing task {} because we are not making 
progress.", currentTask.id());
                     unassignCurrentTask();
                 }
             }
@@ -131,10 +155,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
             try {
                 processed = task.process(now);
                 if (processed) {
+                    log.trace("Successfully processed task {}", task.id());
                     task.clearTaskTimeout();
                     // TODO: enable regardless of whether using named 
topologies
                     if (taskExecutionMetadata.hasNamedTopologies() && 
taskExecutionMetadata.processingMode() != EXACTLY_ONCE_V2) {
-                        log.trace("Successfully processed task {}", task.id());
                         taskExecutionMetadata.addToSuccessfullyProcessed(task);
                     }
                 }
@@ -168,9 +192,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
             if (currentTask == null)
                 throw new IllegalStateException("Does not own any task while 
being ask to unassign from task manager");
 
-            // flush the task before giving it back to task manager
-            // TODO: we can add a separate function in StreamTask to just 
flush and not return offsets
-            currentTask.prepareCommit();
+            // flush the task before giving it back to task manager, if we are 
not handing it back because of an error.
+            if (!taskManager.hasUncaughtException(currentTask.id())) {
+                currentTask.flush();
+            }
             taskManager.unassignTask(currentTask, DefaultTaskExecutor.this);
 
             final StreamTask retTask = currentTask;
@@ -183,6 +208,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
     private final String name;
     private final TaskManager taskManager;
     private final TaskExecutionMetadata taskExecutionMetadata;
+    private final Logger log;
 
     private StreamTask currentTask = null;
     private TaskExecutorThread taskExecutorThread = null;
@@ -196,6 +222,8 @@ public class DefaultTaskExecutor implements TaskExecutor {
         this.name = name;
         this.taskManager = taskManager;
         this.taskExecutionMetadata = taskExecutionMetadata;
+        final LogContext logContext = new LogContext(name);
+        this.log = logContext.logger(DefaultTaskExecutor.class);
     }
 
     @Override
@@ -213,10 +241,20 @@ public class DefaultTaskExecutor implements TaskExecutor {
     }
 
     @Override
-    public void shutdown(final Duration timeout) {
+    public boolean isRunning() {
+        return taskExecutorThread != null && taskExecutorThread.isAlive() && 
shutdownGate.getCount() != 0;
+    }
+
+    @Override
+    public void requestShutdown() {
+        if (taskExecutorThread != null) {
+            taskExecutorThread.shutdownRequested.set(true);
+        }
+    }
+
+    @Override
+    public void awaitShutdown(final Duration timeout) {
         if (taskExecutorThread != null) {
-            taskExecutorThread.isRunning.set(false);
-            taskExecutorThread.interrupt();
             try {
                 if (!shutdownGate.await(timeout.toMillis(), 
TimeUnit.MILLISECONDS)) {
                     throw new StreamsException("State updater thread did not 
shutdown within the timeout");
@@ -237,8 +275,18 @@ public class DefaultTaskExecutor implements TaskExecutor {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         if (taskExecutorThread != null) {
-            taskExecutorThread.pauseRequested.set(future);
+            log.debug("Asking {} to hand back task", 
taskExecutorThread.getName());
+            if (!taskExecutorThread.taskReleaseRequested.compareAndSet(null, 
future)) {
+                throw new IllegalStateException("There was already a task 
release request registered");
+            }
+            if (shutdownGate.getCount() == 0) {
+                log.debug("Completing future, because task executor was just 
shut down");
+                future.complete(null);
+            } else {
+                taskManager.signalTaskExecutors();
+            }
         } else {
+            log.debug("Tried to unassign but no thread is running");
             future.complete(null);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 4f2d22d4711..56749b4f5f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.processor.internals.tasks;
 
+import java.time.Duration;
 import java.util.concurrent.locks.Condition;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
@@ -65,7 +65,7 @@ public class DefaultTaskManager implements TaskManager {
 
     private final List<TaskExecutor> taskExecutors;
 
-    static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
+    public static class DefaultTaskExecutorCreator implements 
TaskExecutorCreator {
         @Override
         public TaskExecutor create(final TaskManager taskManager, final String 
name, final Time time, final TaskExecutionMetadata taskExecutionMetadata) {
             return new DefaultTaskExecutor(taskManager, name, time, 
taskExecutionMetadata);
@@ -76,9 +76,9 @@ public class DefaultTaskManager implements TaskManager {
     public DefaultTaskManager(final Time time,
                               final String clientId,
                               final TasksRegistry tasks,
-                              final StreamsConfig config,
                               final TaskExecutorCreator executorCreator,
-                              final TaskExecutionMetadata taskExecutionMetadata
+                              final TaskExecutionMetadata 
taskExecutionMetadata,
+                              final int numExecutors
                               ) {
         final String logPrefix = String.format("%s ", clientId);
         final LogContext logContext = new LogContext(logPrefix);
@@ -87,7 +87,6 @@ public class DefaultTaskManager implements TaskManager {
         this.tasks = tasks;
         this.taskExecutionMetadata = taskExecutionMetadata;
 
-        final int numExecutors = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
         this.taskExecutors = new ArrayList<>(numExecutors);
         for (int i = 1; i <= numExecutors; i++) {
             final String name = clientId + "-TaskExecutor-" + i;
@@ -106,7 +105,8 @@ public class DefaultTaskManager implements TaskManager {
             for (final Task task : tasks.activeTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
-                    canProgress((StreamTask) task, time.milliseconds())
+                    canProgress((StreamTask) task, time.milliseconds()) &&
+                    !hasUncaughtException(task.id())
                 ) {
 
                     assignedTasks.put(task.id(), executor);
@@ -117,6 +117,8 @@ public class DefaultTaskManager implements TaskManager {
                 }
             }
 
+            log.debug("Found no assignable task for executor {}", 
executor.name());
+
             return null;
         });
     }
@@ -127,7 +129,8 @@ public class DefaultTaskManager implements TaskManager {
             for (final Task task : tasks.activeTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
-                    canProgress((StreamTask) task, time.milliseconds())
+                    canProgress((StreamTask) task, time.milliseconds()) &&
+                    !hasUncaughtException(task.id())
                 ) {
                     log.debug("Await unblocked: returning early from await 
since a processable task {} was found", task.id());
                     return false;
@@ -151,7 +154,7 @@ public class DefaultTaskManager implements TaskManager {
         }
     }
 
-    public void signalProcessableTasks() {
+    public void signalTaskExecutors() {
         log.debug("Waking up task executors");
         executeWithTasksLocked(tasksCondition::signalAll);
     }
@@ -177,10 +180,16 @@ public class DefaultTaskManager implements TaskManager {
 
     @Override
     public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        if (taskIds.isEmpty()) {
+            result.complete(null);
+            return result;
+        }
+
         return returnWithTasksLocked(() -> {
             lockedTasks.addAll(taskIds);
 
-            final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
             final Set<TaskId> remainingTaskIds = new 
ConcurrentSkipListSet<>(taskIds);
 
             for (final TaskId taskId : taskIds) {
@@ -195,12 +204,18 @@ public class DefaultTaskManager implements TaskManager {
                 }
 
                 if (assignedTasks.containsKey(taskId)) {
-                    final KafkaFuture<StreamTask> future = 
assignedTasks.get(taskId).unassign();
+                    final TaskExecutor executor = assignedTasks.get(taskId);
+                    log.debug("Requesting release of task {} from {}", taskId, 
executor.name());
+                    final KafkaFuture<StreamTask> future = executor.unassign();
                     future.whenComplete((streamTask, throwable) -> {
                         if (throwable != null) {
                             result.completeExceptionally(throwable);
                         } else {
-                            remainingTaskIds.remove(streamTask.id());
+                            assert !assignedTasks.containsKey(taskId);
+                            // It can happen that the executor handed back the 
task before we asked it to
+                            // in which case `streamTask` will be null here.
+                            assert streamTask == null || streamTask.id() == 
taskId;
+                            remainingTaskIds.remove(taskId);
                             if (remainingTaskIds.isEmpty()) {
                                 result.complete(null);
                             }
@@ -227,6 +242,11 @@ public class DefaultTaskManager implements TaskManager {
 
     @Override
     public void unlockTasks(final Set<TaskId> taskIds) {
+
+        if (taskIds.isEmpty()) {
+            return;
+        }
+
         executeWithTasksLocked(() -> {
             lockedTasks.removeAll(taskIds);
             log.debug("Waking up task executors");
@@ -306,11 +326,17 @@ public class DefaultTaskManager implements TaskManager {
             return result;
         });
 
-        log.debug("Drained {} uncaught exceptions", returnValue.size());
+        if (!returnValue.isEmpty()) {
+            log.debug("Drained {} uncaught exceptions", returnValue.size());
+        }
 
         return returnValue;
     }
 
+    public boolean hasUncaughtException(final TaskId taskId) {
+        return returnWithTasksLocked(() -> 
uncaughtExceptions.containsKey(taskId));
+    }
+
     private void executeWithTasksLocked(final Runnable action) {
         tasksLock.lock();
         try {
@@ -334,5 +360,21 @@ public class DefaultTaskManager implements TaskManager {
             taskExecutionMetadata.canProcessTask(task, nowMs) && 
task.isProcessable(nowMs) ||
                 taskExecutionMetadata.canPunctuateTask(task) && 
(task.canPunctuateStreamTime() || task.canPunctuateSystemTime());
     }
+
+    public void startTaskExecutors() {
+        for (final TaskExecutor t: taskExecutors) {
+            t.start();
+        }
+    }
+
+    public void shutdown(final Duration duration) {
+        for (final TaskExecutor t: taskExecutors) {
+            t.requestShutdown();
+        }
+        signalTaskExecutors();
+        for (final TaskExecutor t: taskExecutors) {
+            t.awaitShutdown(duration);
+        }
+    }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
index 04538744a29..aada2fa2341 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
@@ -31,18 +31,35 @@ public interface TaskExecutor {
 
     /**
      * Starts the task executor.
+     * Idempotent operation - will have no effect if thread is already started.
      */
     void start();
 
+    /**
+     * Returns true if the task executor thread is running.
+     */
+    boolean isRunning();
+
+    /**
+     * Asks the task executor to shut down.
+     * Idempotent operation - will have no effect if thread was already asked 
to shut down
+     *
+     * @throws
+     *     org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+     */
+    void requestShutdown();
+
     /**
      * Shuts down the task processor updater.
+     * Idempotent operation - will have no effect if thread is already shut 
down.
+     * Must call `requestShutdown` first.
      *
      * @param timeout duration how long to wait until the state updater is 
shut down
      *
      * @throws
-     *     org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+     *     org.apache.kafka.streams.errors.StreamsException if the state 
updater thread does not shutdown within the timeout
      */
-    void shutdown(final Duration timeout);
+    void awaitShutdown(final Duration timeout);
 
     /**
      * Get the current assigned processing task. The task returned is 
read-only and cannot be modified.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
index 63e9f2baecf..6a52f71dd62 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals.tasks;
 
+import java.time.Duration;
 import java.util.Map;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -59,7 +60,7 @@ public interface TaskManager {
     KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds);
 
     /**
-     * Lock all of the managed active tasks from the task manager. Similar to 
{@link #lockTasks(Set)}.
+     * Lock all the managed active tasks from the task manager. Similar to 
{@link #lockTasks(Set)}.
      *
      * This method does not block, instead a future is returned.
      */
@@ -71,7 +72,7 @@ public interface TaskManager {
     void unlockTasks(final Set<TaskId> taskIds);
 
     /**
-     * Unlock all of the managed active tasks from the task manager. Similar 
to {@link #unlockTasks(Set)}.
+     * Unlock all the managed active tasks from the task manager. Similar to 
{@link #unlockTasks(Set)}.
      */
     void unlockAllTasks();
 
@@ -119,14 +120,34 @@ public interface TaskManager {
      */
     Map<TaskId, StreamsException> drainUncaughtExceptions();
 
+    /**
+     * Can be used to check if a specific task has an uncaught exception.
+     *
+     * @param taskId the task ID to check for
+     */
+    boolean hasUncaughtException(final TaskId taskId);
+
     /**
      * Signals that at least one task has become processable, e.g. because it 
was resumed or new records may be available.
      */
-    void signalProcessableTasks();
+    void signalTaskExecutors();
 
     /**
      * Blocks until unassigned processable tasks may be available.
      */
     void awaitProcessableTasks() throws InterruptedException;
 
+    /**
+     * Starts all threads associated with this task manager.
+     */
+    void startTaskExecutors();
+
+    /**
+     * Shuts down all threads associated with this task manager.
+     * All tasks will be unlocked and unassigned by the end of this.
+     *
+     * @param duration Time to wait for each thread to shut down.
+     */
+    void shutdown(final Duration duration);
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7ed11a61ece..e295d29e419 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2323,6 +2323,19 @@ public class StreamTaskTest {
         assertThat(task.state(), is(Task.State.CLOSED));
     }
 
+    @Test
+    public void shouldFlushStateManagerAndRecordCollector() {
+        stateManager.flush();
+        EasyMock.expectLastCall().once();
+        recordCollector.flush();
+        EasyMock.expectLastCall().once();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig("100"), false);
+
+        task.flush();
+    }
+
     @Test
     public void shouldClearCommitStatusesInCloseDirty() {
         task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index d24f699c3da..a697aabf3bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
 import org.junit.jupiter.api.AfterEach;
@@ -31,6 +32,7 @@ import java.time.Duration;
 import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -43,6 +45,8 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
 public class DefaultTaskExecutorTest {
 
     private final static long VERIFICATION_TIMEOUT = 15000;
@@ -60,29 +64,51 @@ public class DefaultTaskExecutorTest {
         
when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null);
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(task.isProcessable(anyLong())).thenReturn(true);
+        when(task.id()).thenReturn(new TaskId(0, 0, "A"));
         when(task.process(anyLong())).thenReturn(true);
         when(task.prepareCommit()).thenReturn(Collections.emptyMap());
     }
 
     @AfterEach
     public void tearDown() {
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
     }
 
     @Test
     public void shouldShutdownTaskExecutor() {
         assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+        assertFalse(taskExecutor.isRunning());
 
         taskExecutor.start();
 
+        assertTrue(taskExecutor.isRunning());
         verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
 
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
 
-        verify(task).prepareCommit();
+        verify(task).flush();
         verify(taskManager).unassignTask(task, taskExecutor);
 
         assertNull(taskExecutor.currentTask(), "Have task assigned after 
shutdown");
+        assertFalse(taskExecutor.isRunning());
+    }
+
+    @Test
+    public void shouldClearTaskReleaseFutureOnShutdown() throws 
InterruptedException {
+        assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+
+        taskExecutor.start();
+
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
+
+        final KafkaFuture<StreamTask> future = taskExecutor.unassign();
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
+
+        waitForCondition(future::isDone, "Await for unassign future to 
complete");
+        assertNull(taskExecutor.currentTask(), "Have task assigned after 
shutdown");
     }
 
     @Test
@@ -104,7 +130,7 @@ public class DefaultTaskExecutorTest {
         taskExecutor.start();
 
         verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, 
taskExecutor);
-        verify(task).prepareCommit();
+        verify(task).flush();
         assertNull(taskExecutor.currentTask());
     }
 
@@ -209,7 +235,7 @@ public class DefaultTaskExecutorTest {
         final KafkaFuture<StreamTask> future = taskExecutor.unassign();
 
         verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, 
taskExecutor);
-        verify(task).prepareCommit();
+        verify(task).flush();
         assertNull(taskExecutor.currentTask());
 
         assertTrue(future.isDone(), "Unassign is not completed");
@@ -223,10 +249,22 @@ public class DefaultTaskExecutorTest {
 
         taskExecutor.start();
 
-        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
         verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
         verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, 
taskExecutor);
         assertNull(taskExecutor.currentTask());
+        assertTrue(taskExecutor.isRunning(), "should not shut down upon 
exception");
+    }
+
+    @Test
+    public void shouldNotFlushOnException() {
+        final StreamsException exception = mock(StreamsException.class);
+        when(task.process(anyLong())).thenThrow(exception);
+        when(taskManager.hasUncaughtException(task.id())).thenReturn(true);
+
+        taskExecutor.start();
+
+        verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, 
taskExecutor);
+        verify(task, never()).flush();
     }
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 5b943334ff5..e0b8a6c1287 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals.tasks;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -23,9 +24,9 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
 import org.apache.kafka.streams.processor.internals.TasksRegistry;
@@ -34,11 +35,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -65,17 +61,8 @@ public class DefaultTaskManagerTest {
     private final StreamsException exception = mock(StreamsException.class);
     private final TaskExecutionMetadata taskExecutionMetadata = 
mock(TaskExecutionMetadata.class);
 
-    private final StreamsConfig config = new StreamsConfig(configProps());
-    private final TaskManager taskManager = new DefaultTaskManager(time, 
"TaskManager", tasks, config,
-        (taskManager, name, time, taskExecutionMetadata) -> taskExecutor, 
taskExecutionMetadata);
-
-    private Properties configProps() {
-        return mkObjectProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
-            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
-            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2)
-        ));
-    }
+    private final TaskManager taskManager = new DefaultTaskManager(time, 
"TaskManager", tasks,
+        (taskManager, name, time, taskExecutionMetadata) -> taskExecutor, 
taskExecutionMetadata, 1);
 
     @BeforeEach
     public void setUp() {
@@ -84,6 +71,22 @@ public class DefaultTaskManagerTest {
         when(tasks.task(taskId)).thenReturn(task);
     }
 
+    @Test
+    public void shouldShutdownTaskExecutors() {
+        final Duration duration = mock(Duration.class);
+        taskManager.shutdown(duration);
+
+        verify(taskExecutor).requestShutdown();
+        verify(taskExecutor).awaitShutdown(duration);
+    }
+
+    @Test
+    public void shouldStartTaskExecutors() {
+        taskManager.startTaskExecutors();
+
+        verify(taskExecutor).start();
+    }
+
     @Test
     public void shouldAddTask() {
         taskManager.add(Collections.singleton(task));
@@ -119,7 +122,7 @@ public class DefaultTaskManagerTest {
 
         public void shutdown() {
             shutdownRequested.set(true);
-            taskManager.signalProcessableTasks();
+            taskManager.signalTaskExecutors();
         }
     }
 
@@ -155,7 +158,7 @@ public class DefaultTaskManagerTest {
         awaitingThread.start();
         verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
 
-        taskManager.signalProcessableTasks();
+        taskManager.signalTaskExecutors();
 
         assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
TimeUnit.MILLISECONDS));
 
@@ -233,6 +236,18 @@ public class DefaultTaskManagerTest {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        ensureTaskMakesProgress();
+        taskManager.assignNextTask(taskExecutor);
+        taskManager.setUncaughtException(new StreamsException("Exception"), 
taskId);
+        taskManager.unassignTask(task, taskExecutor);
+
+        assertNull(taskManager.assignNextTask(taskExecutor));
+    }
+
     @Test
     public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
         taskManager.add(Collections.singleton(task));
@@ -314,6 +329,7 @@ public class DefaultTaskManagerTest {
     public void shouldNotAssignLockedTask() {
         taskManager.add(Collections.singleton(task));
         when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(tasks.task(task.id())).thenReturn(task);
         when(tasks.contains(task.id())).thenReturn(true);
 
@@ -322,10 +338,48 @@ public class DefaultTaskManagerTest {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldLockAnEmptySetOfTasks() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+
+        assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+    }
+
+
+    @Test
+    public void shouldLockATaskThatWasVoluntarilyReleased() {
+        final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
+
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+        when(taskExecutor.unassign()).thenReturn(future);
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+
+        final KafkaFuture<Void> lockingFuture = 
taskManager.lockTasks(Collections.singleton(task.id()));
+        assertFalse(lockingFuture.isDone());
+
+        taskManager.unassignTask(task, taskExecutor);
+        future.complete(null);
+
+        assertTrue(lockingFuture.isDone());
+        assertNull(taskManager.assignNextTask(taskExecutor));
+    }
+
     @Test
     public void shouldNotAssignAnyLockedTask() {
         taskManager.add(Collections.singleton(task));
         when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(tasks.task(task.id())).thenReturn(task);
         when(tasks.contains(task.id())).thenReturn(true);
 
@@ -379,12 +433,21 @@ public class DefaultTaskManagerTest {
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
 
+        when(taskExecutor.currentTask()).thenReturn(new ReadOnlyTask(task));
         final KafkaFuture<Void> lockFuture = taskManager.lockAllTasks();
         assertFalse(lockFuture.isDone());
 
         verify(taskExecutor).unassign();
 
+        taskManager.unassignTask(task, taskExecutor);
         future.complete(task);
+
         assertTrue(lockFuture.isDone());
     }
+
+    private void ensureTaskMakesProgress() {
+        
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
+        when(task.canPunctuateStreamTime()).thenReturn(true);
+    }
+
 }


Reply via email to