This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55a3a95b7a1 Kafka Streams Threading P3: TaskManager Impl (#12754)
55a3a95b7a1 is described below

commit 55a3a95b7a1f61c2a93a6341d3e091f68b048234
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Oct 14 16:10:57 2022 -0700

    Kafka Streams Threading P3: TaskManager Impl (#12754)
    
    0. Add name to task executors.
    1. DefaultTaskManager implementation, for interacting with the 
TaskExecutors and support add/remove/lock APIs.
    2. Related unit tests.
---
 .../internals/tasks/DefaultTaskExecutor.java       |  13 +-
 .../internals/tasks/DefaultTaskManager.java        | 246 +++++++++++++++++++++
 .../processor/internals/tasks/TaskExecutor.java    |   7 +-
 .../internals/tasks/TaskExecutorCreator.java       |  24 ++
 .../internals/tasks/DefaultTaskExecutorTest.java   |   2 +-
 .../internals/tasks/DefaultTaskManagerTest.java    | 189 ++++++++++++++++
 6 files changed, 476 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
index 5ad0950fc51..c03384f4daa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.internals.DefaultStateUpdater;
 import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.slf4j.Logger;
@@ -45,7 +44,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
             super(name);
             final String logPrefix = String.format("%s ", name);
             final LogContext logContext = new LogContext(logPrefix);
-            log = logContext.logger(DefaultStateUpdater.class);
+            log = logContext.logger(DefaultTaskExecutor.class);
         }
 
         @Override
@@ -102,6 +101,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
     }
 
     private final Time time;
+    private final String name;
     private final TaskManager taskManager;
 
     private StreamTask currentTask = null;
@@ -109,15 +109,22 @@ public class DefaultTaskExecutor implements TaskExecutor {
     private CountDownLatch shutdownGate;
 
     public DefaultTaskExecutor(final TaskManager taskManager,
+                               final String name,
                                final Time time) {
         this.time = time;
+        this.name = name;
         this.taskManager = taskManager;
     }
 
+    @Override
+    public String name() {
+        return name;
+    }
+
     @Override
     public void start() {
         if (taskExecutorThread == null) {
-            taskExecutorThread = new TaskExecutorThread("task-executor");
+            taskExecutorThread = new TaskExecutorThread(name);
             taskExecutorThread.start();
             shutdownGate = new CountDownLatch(1);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
new file mode 100644
index 00000000000..3f97de85ceb
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.Task;
+import org.apache.kafka.streams.processor.internals.TasksRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * An active task could only be in one of the following status:
+ *
+ * 1. It's assigned to one of the executors for processing.
+ * 2. It's locked for committing, removal, other manipulations etc.
+ * 3. Neither 1 or 2, i.e. it stays idle. This is possible if we do not have 
enough executors or because those tasks
+ *    are not processable (e.g. because no records fetched) yet.
+ */
+public class DefaultTaskManager implements TaskManager {
+
+    private final Time time;
+    private final Logger log;
+    private final TasksRegistry tasks;
+
+    private final Lock tasksLock = new ReentrantLock();
+    private final List<TaskId> lockedTasks = new ArrayList<>();
+    private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();
+
+    private final List<TaskExecutor> taskExecutors;
+
+    static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
+        @Override
+        public TaskExecutor create(final TaskManager taskManager, final String 
name, final Time time) {
+            return new DefaultTaskExecutor(taskManager, name, time);
+        }
+    }
+
+    public DefaultTaskManager(final Time time,
+                              final String clientId,
+                              final TasksRegistry tasks,
+                              final StreamsConfig config,
+                              final TaskExecutorCreator executorCreator) {
+        final String logPrefix = String.format("%s ", clientId);
+        final LogContext logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(DefaultTaskManager.class);
+        this.time = time;
+        this.tasks = tasks;
+
+        final int numExecutors = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        this.taskExecutors = new ArrayList<>(numExecutors);
+        for (int i = 1; i <= numExecutors; i++) {
+            final String name = clientId + "-TaskExecutor-" + i;
+            this.taskExecutors.add(executorCreator.create(this, name, time));
+        }
+    }
+
+    @Override
+    public StreamTask assignNextTask(final TaskExecutor executor) {
+        return returnWithTasksLocked(() -> {
+            if (!taskExecutors.contains(executor)) {
+                throw new IllegalArgumentException("The requested executor for 
getting next task to assign is unrecognized");
+            }
+
+            // the most naive scheduling algorithm for now: give the next 
unlocked, unassigned, and  processable task
+            for (final Task task : tasks.activeTasks()) {
+                if (!assignedTasks.containsKey(task.id()) &&
+                    !lockedTasks.contains(task.id()) &&
+                    ((StreamTask) task).isProcessable(time.milliseconds())) {
+
+                    assignedTasks.put(task.id(), executor);
+
+                    log.info("Assigned {} to executor {}", task.id(), 
executor.name());
+
+                    return (StreamTask) task;
+                }
+            }
+
+            return null;
+        });
+    }
+
+    @Override
+    public void unassignTask(final StreamTask task, final TaskExecutor 
executor) {
+        executeWithTasksLocked(() -> {
+            if (!taskExecutors.contains(executor)) {
+                throw new IllegalArgumentException("The requested executor for 
unassign task is unrecognized");
+            }
+
+            final TaskExecutor lockedExecutor = assignedTasks.get(task.id());
+            if (lockedExecutor == null || lockedExecutor != executor) {
+                throw new IllegalArgumentException("Task " + task.id() + " is 
not locked by the executor");
+            }
+
+            assignedTasks.remove(task.id());
+
+            log.info("Unassigned {} from executor {}", task.id(), 
executor.name());
+        });
+    }
+
+    @Override
+    public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) {
+        return returnWithTasksLocked(() -> {
+            lockedTasks.addAll(taskIds);
+
+            final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+            final Set<TaskId> remainingTaskIds = new 
ConcurrentSkipListSet<>(taskIds);
+
+            for (final TaskId taskId : taskIds) {
+                final Task task = tasks.task(taskId);
+
+                if (task == null) {
+                    throw new IllegalArgumentException("Trying to lock task " 
+ taskId + " but it's not owned");
+                }
+
+                if (!task.isActive()) {
+                    throw new IllegalArgumentException("The locking task " + 
taskId + " is not an active task");
+                }
+
+                if (assignedTasks.containsKey(taskId)) {
+                    final KafkaFuture<StreamTask> future = 
assignedTasks.get(taskId).unassign();
+                    future.whenComplete((streamTask, throwable) -> {
+                        if (throwable != null) {
+                            result.completeExceptionally(throwable);
+                        } else {
+                            remainingTaskIds.remove(streamTask.id());
+                            if (remainingTaskIds.isEmpty()) {
+                                result.complete(null);
+                            }
+                        }
+                    });
+                } else {
+                    remainingTaskIds.remove(taskId);
+                    if (remainingTaskIds.isEmpty()) {
+                        result.complete(null);
+                    }
+                }
+            }
+
+            return result;
+        });
+    }
+
+    @Override
+    public KafkaFuture<Void> lockAllTasks() {
+        return returnWithTasksLocked(() ->
+            
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
+        );
+    }
+
+    @Override
+    public void unlockTasks(final Set<TaskId> taskIds) {
+        executeWithTasksLocked(() -> lockedTasks.removeAll(taskIds));
+    }
+
+    @Override
+    public void unlockAllTasks() {
+        executeWithTasksLocked(() -> 
unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
+    }
+
+    @Override
+    public void add(final Set<StreamTask> tasksToAdd) {
+        executeWithTasksLocked(() -> {
+            for (final StreamTask task : tasksToAdd) {
+                tasks.addTask(task);
+            }
+        });
+
+        log.info("Added tasks {} to the task manager to process", tasksToAdd);
+    }
+
+    @Override
+    public void remove(final TaskId taskId) {
+        executeWithTasksLocked(() -> {
+            if (assignedTasks.containsKey(taskId)) {
+                throw new IllegalArgumentException("The task to remove is 
still assigned to executors");
+            }
+
+            if (!lockedTasks.contains(taskId)) {
+                throw new IllegalArgumentException("The task to remove is not 
locked yet by the task manager");
+            }
+
+            if (!tasks.contains(taskId)) {
+                throw new IllegalArgumentException("The task to remove is not 
owned by the task manager");
+            }
+
+            tasks.removeTask(tasks.task(taskId));
+        });
+
+        log.info("Removed task {} from the task manager", taskId);
+    }
+
+    @Override
+    public Set<ReadOnlyTask> getTasks() {
+        return returnWithTasksLocked(() -> 
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
+    }
+
+    private void executeWithTasksLocked(final Runnable action) {
+        tasksLock.lock();
+        try {
+            action.run();
+        } finally {
+            tasksLock.unlock();
+        }
+    }
+
+    private <T> T returnWithTasksLocked(final Supplier<T> action) {
+        tasksLock.lock();
+        try {
+            return action.get();
+        } finally {
+            tasksLock.unlock();
+        }
+    }
+}
+
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
index 9e660986a78..04538744a29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java
@@ -25,7 +25,12 @@ import java.time.Duration;
 public interface TaskExecutor {
 
     /**
-     * Starts the task processor.
+     * @return ID name string of the task executor.
+     */
+    String name();
+
+    /**
+     * Starts the task executor.
      */
     void start();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java
new file mode 100644
index 00000000000..c18eb973792
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutorCreator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.utils.Time;
+
+public interface TaskExecutorCreator {
+
+    TaskExecutor create(final TaskManager taskManager, String name, Time time);
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index 513609366db..c2faeef880b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -45,7 +45,7 @@ public class DefaultTaskExecutorTest {
     private final StreamTask task = mock(StreamTask.class);
     private final TaskManager taskManager = mock(TaskManager.class);
 
-    private final DefaultTaskExecutor taskExecutor = new 
DefaultTaskExecutor(taskManager, time);
+    private final DefaultTaskExecutor taskExecutor = new 
DefaultTaskExecutor(taskManager, "TaskExecutor", time);
 
     @BeforeEach
     public void setUp() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
new file mode 100644
index 00000000000..e17a724f365
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.TasksRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DefaultTaskManagerTest {
+
+    private final Time time = new MockTime(1L);
+    private final StreamTask task = mock(StreamTask.class);
+    private final TasksRegistry tasks = mock(TasksRegistry.class);
+    private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
+    private final StreamsConfig config = new StreamsConfig(configProps());
+    private final TaskManager taskManager = new DefaultTaskManager(time, 
"TaskManager", tasks, config,
+        (taskManager, name, time) -> taskExecutor);
+
+    private Properties configProps() {
+        return mkObjectProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2)
+        ));
+    }
+
+    @BeforeEach
+    public void setUp() {
+        when(task.id()).thenReturn(new TaskId(0, 0, "A"));
+        when(task.isProcessable(anyLong())).thenReturn(true);
+        when(task.isActive()).thenReturn(true);
+    }
+
+    @Test
+    public void shouldAddTask() {
+        taskManager.add(Collections.singleton(task));
+
+        verify(tasks).addTask(task);
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        assertEquals(1, taskManager.getTasks().size());
+    }
+
+    @Test
+    public void shouldAssignTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+        assertNull(taskManager.assignNextTask(taskExecutor));
+    }
+
+    @Test
+    public void shouldUnassignTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+
+        taskManager.unassignTask(task, taskExecutor);
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+    }
+
+    @Test
+    public void shouldNotUnassignNotOwnedTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+
+        final TaskExecutor anotherExecutor = mock(TaskExecutor.class);
+        assertThrows(IllegalArgumentException.class, () -> 
taskManager.unassignTask(task, anotherExecutor));
+    }
+
+    @Test
+    public void shouldNotRemoveUnlockedTask() {
+        taskManager.add(Collections.singleton(task));
+
+        assertThrows(IllegalArgumentException.class, () -> 
taskManager.remove(task.id()));
+    }
+
+    @Test
+    public void shouldNotRemoveAssignedTask() {
+        taskManager.add(Collections.singleton(task));
+        taskManager.assignNextTask(taskExecutor);
+
+        assertThrows(IllegalArgumentException.class, () -> 
taskManager.remove(task.id()));
+    }
+
+    @Test
+    public void shouldRemoveTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+
+        taskManager.lockTasks(Collections.singleton(task.id()));
+        taskManager.remove(task.id());
+
+        verify(tasks).removeTask(task);
+        reset(tasks);
+        when(tasks.activeTasks()).thenReturn(Collections.emptySet());
+
+        assertEquals(0, taskManager.getTasks().size());
+    }
+
+    @Test
+    public void shouldNotAssignLockedTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+
+        
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
+
+        assertNull(taskManager.assignNextTask(taskExecutor));
+    }
+
+    @Test
+    public void shouldNotAssignAnyLockedTask() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+
+        assertTrue(taskManager.lockAllTasks().isDone());
+
+        assertNull(taskManager.assignNextTask(taskExecutor));
+    }
+
+    @Test
+    public void shouldUnassignLockingTask() {
+        final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
+
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.task(task.id())).thenReturn(task);
+        when(tasks.contains(task.id())).thenReturn(true);
+        when(taskExecutor.unassign()).thenReturn(future);
+
+        assertEquals(task, taskManager.assignNextTask(taskExecutor));
+
+        final KafkaFuture<Void> lockFuture = taskManager.lockAllTasks();
+        assertFalse(lockFuture.isDone());
+
+        verify(taskExecutor).unassign();
+
+        future.complete(task);
+        assertTrue(lockFuture.isDone());
+    }
+}

Reply via email to