This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f20750dc14 Kafka Streams Threading: Exception handling (#13957)
5f20750dc14 is described below

commit 5f20750dc14c56538e904c363d27264f02e74d82
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jul 13 14:33:39 2023 +0200

    Kafka Streams Threading: Exception handling (#13957)
    
    Catch any exceptions that escape the processing logic
    inside TaskExecutors and record them in the TaskManager.
    Make sure the TaskExecutor survives, but the task is
    unassigned. Add a method to TaskManager to drain the
    exceptions. The aim here is that the polling thread will
    drain the exceptions to be able to execute the
    uncaught exception handler, abort transactions, etc.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../internals/tasks/DefaultTaskExecutor.java       | 14 ++++++++-
 .../internals/tasks/DefaultTaskManager.java        | 36 ++++++++++++++++++++++
 .../processor/internals/tasks/TaskManager.java     | 23 ++++++++++++++
 .../internals/tasks/DefaultTaskExecutorTest.java   | 15 +++++++++
 .../internals/tasks/DefaultTaskManagerTest.java    | 32 +++++++++++++++++++
 5 files changed, 119 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
index c03384f4daa..b59d695d9b0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
@@ -54,7 +54,10 @@ public class DefaultTaskExecutor implements TaskExecutor {
                 while (isRunning.get()) {
                     runOnce(time.milliseconds());
                 }
-                // TODO: add exception handling
+            } catch (final StreamsException e) {
+                handleException(e);
+            } catch (final Exception e) {
+                handleException(new StreamsException(e));
             } finally {
                 if (currentTask != null) {
                     unassignCurrentTask();
@@ -65,6 +68,15 @@ public class DefaultTaskExecutor implements TaskExecutor {
             }
         }
 
+        private void handleException(final StreamsException e) {
+            if (currentTask != null) {
+                taskManager.setUncaughtException(e, currentTask.id());
+            } else {
+                // If we do not currently have a task assigned and still get 
an error, this is fatal for the executor thread
+                throw e;
+            }
+        }
+
         private void runOnce(final long nowMs) {
             final KafkaFutureImpl<StreamTask> pauseFuture;
             if ((pauseFuture = pauseRequested.getAndSet(null)) != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 3f97de85ceb..41526356d61 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
 import org.apache.kafka.streams.processor.internals.StreamTask;
@@ -55,6 +56,7 @@ public class DefaultTaskManager implements TaskManager {
 
     private final Lock tasksLock = new ReentrantLock();
     private final List<TaskId> lockedTasks = new ArrayList<>();
+    private final Map<TaskId, StreamsException> uncaughtExceptions = new 
HashMap<>();
     private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();
 
     private final List<TaskExecutor> taskExecutors;
@@ -225,6 +227,40 @@ public class DefaultTaskManager implements TaskManager {
         return returnWithTasksLocked(() -> 
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public void setUncaughtException(final StreamsException exception, final 
TaskId taskId) {
+        executeWithTasksLocked(() -> {
+
+            if (!assignedTasks.containsKey(taskId)) {
+                throw new IllegalArgumentException("An uncaught exception can 
only be set as long as the task is still assigned");
+            }
+
+            if (uncaughtExceptions.containsKey(taskId)) {
+                throw new IllegalArgumentException("The uncaught exception 
must be cleared before restarting processing");
+            }
+
+            uncaughtExceptions.put(taskId, exception);
+        });
+
+        log.info("Set an uncaught exception of type {} for task {}, with error 
message: {}",
+            exception.getClass().getName(),
+            taskId,
+            exception.getMessage());
+    }
+
+    public Map<TaskId, StreamsException> drainUncaughtExceptions() {
+        final Map<TaskId, StreamsException> returnValue = 
returnWithTasksLocked(() -> {
+            final Map<TaskId, StreamsException> result = new 
HashMap<>(uncaughtExceptions);
+            uncaughtExceptions.clear();
+            return result;
+        });
+
+        log.info("Drained {} uncaught exceptions", returnValue.size());
+
+        return returnValue;
+    }
+
+
     private void executeWithTasksLocked(final Runnable action) {
         tasksLock.lock();
         try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
index cfca9f75fe8..707719a00a6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals.tasks;
 
+import java.util.Map;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
 import org.apache.kafka.streams.processor.internals.StreamTask;
@@ -98,4 +100,25 @@ public interface TaskManager {
      * @return set of all managed active tasks
      */
     Set<ReadOnlyTask> getTasks();
+
+    /**
+     * Called whenever an existing task has thrown an uncaught exception.
+     *
+     * Setting an uncaught exception for a task prevents it from being 
reassigned until the
+     * corresponding exception has been handled in the polling thread.
+     *
+     */
+    void setUncaughtException(StreamsException exception, TaskId taskId);
+
+    /**
+     * Returns and clears all uncaught exceptions that were fell through to 
the processing
+     * threads and need to be handled in the polling thread.
+     *
+     * Called by the polling thread to handle processing exceptions, e.g. to 
abort
+     * transactions or shut down the application.
+     *
+     * @return A map from task ID to the exception that occurred.
+     */
+    Map<TaskId, StreamsException> drainUncaughtExceptions();
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index c2faeef880b..db33a0d7c67 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.tasks;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -104,4 +105,18 @@ public class DefaultTaskExecutorTest {
         assertTrue(future.isDone(), "Unassign is not completed");
         assertEquals(task, future.get(), "Unexpected task was unassigned");
     }
+
+    @Test
+    public void shouldSetUncaughtStreamsException() {
+        final StreamsException exception = mock(StreamsException.class);
+        when(task.process(anyLong())).thenThrow(exception);
+
+        taskExecutor.start();
+
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).setUncaughtException(exception, task.id());
+        verify(taskManager, timeout(VERIFICATION_TIMEOUT)).unassignTask(task, 
taskExecutor);
+        assertNull(taskExecutor.currentTask());
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index e17a724f365..16e944d9704 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.TasksRegistry;
@@ -50,6 +51,7 @@ public class DefaultTaskManagerTest {
     private final StreamTask task = mock(StreamTask.class);
     private final TasksRegistry tasks = mock(TasksRegistry.class);
     private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+    private final StreamsException exception = mock(StreamsException.class);
 
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final TaskManager taskManager = new DefaultTaskManager(time, 
"TaskManager", tasks, config,
@@ -166,6 +168,36 @@ public class DefaultTaskManagerTest {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldNotSetUncaughtExceptionsForUnassignedTasks() {
+        taskManager.add(Collections.singleton(task));
+
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
taskManager.setUncaughtException(exception, task.id()));
+        assertEquals("An uncaught exception can only be set as long as the 
task is still assigned", e.getMessage());
+    }
+
+    @Test
+    public void shouldNotSetUncaughtExceptionsTwice() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        taskManager.assignNextTask(taskExecutor);
+        taskManager.setUncaughtException(exception, task.id());
+
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> 
taskManager.setUncaughtException(exception, task.id()));
+        assertEquals("The uncaught exception must be cleared before restarting 
processing", e.getMessage());
+    }
+
+    @Test
+    public void shouldReturnAndClearExceptionsOnDrainExceptions() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        taskManager.assignNextTask(taskExecutor);
+        taskManager.setUncaughtException(exception, task.id());
+
+        assertEquals(taskManager.drainUncaughtExceptions(), 
Collections.singletonMap(task.id(), exception));
+        assertEquals(taskManager.drainUncaughtExceptions(), 
Collections.emptyMap());
+    }
+
     @Test
     public void shouldUnassignLockingTask() {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();

Reply via email to