This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f191126550f KAFKA-10199: Introduce task registry (#12549)
f191126550f is described below
commit f191126550f155c50069a75a82fd44923997642a
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Aug 24 08:19:40 2022 +0200
KAFKA-10199: Introduce task registry (#12549)
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <[email protected]>, Walker Carlson
<[email protected]>
---
.../streams/processor/internals/StreamThread.java | 1 +
.../streams/processor/internals/TaskExecutor.java | 4 +-
.../streams/processor/internals/TaskManager.java | 15 ++--
.../kafka/streams/processor/internals/Tasks.java | 96 ++++++++++++++--------
.../streams/processor/internals/TasksRegistry.java | 89 ++++++++++++++++++++
.../processor/internals/StreamThreadTest.java | 2 +
.../processor/internals/TaskManagerTest.java | 4 +-
.../streams/processor/internals/TasksTest.java | 6 +-
8 files changed, 172 insertions(+), 45 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5afc747408f..aa1aa2c315e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -393,6 +393,7 @@ public class StreamThread extends Thread {
logPrefix,
activeTaskCreator,
standbyTaskCreator,
+ new Tasks(new LogContext(logPrefix)),
topologyMetadata,
adminClient,
stateDirectory,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 2827fe1e906..b839c22f5aa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -46,11 +46,11 @@ import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMo
public class TaskExecutor {
private final Logger log;
- private final Tasks tasks;
+ private final TasksRegistry tasks;
private final TaskManager taskManager;
private final TaskExecutionMetadata executionMetadata;
- public TaskExecutor(final Tasks tasks,
+ public TaskExecutor(final TasksRegistry tasks,
final TaskManager taskManager,
final TaskExecutionMetadata executionMetadata,
final LogContext logContext) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 478b783d68f..e2fefe93d98 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -72,7 +72,7 @@ public class TaskManager {
// by QueryableState
private final Logger log;
private final Time time;
- private final Tasks tasks;
+ private final TasksRegistry tasks;
private final UUID processId;
private final String logPrefix;
private final Admin adminClient;
@@ -102,6 +102,7 @@ public class TaskManager {
final String logPrefix,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator,
+ final TasksRegistry tasks,
final TopologyMetadata topologyMetadata,
final Admin adminClient,
final StateDirectory stateDirectory,
@@ -121,9 +122,9 @@ public class TaskManager {
this.log = logContext.logger(getClass());
this.stateUpdater = stateUpdater;
- this.tasks = new Tasks(logContext);
+ this.tasks = tasks;
this.taskExecutor = new TaskExecutor(
- tasks,
+ this.tasks,
this,
topologyMetadata.taskExecutionMetadata(),
logContext
@@ -927,7 +928,7 @@ public class TaskManager {
// so we consider all tasks that are either owned or on disk. This
includes stateless tasks, which should
// just have an empty changelogOffsets map.
for (final TaskId id : union(HashSet::new, lockedTaskDirectories,
tasks.allTaskIds())) {
- final Task task = tasks.owned(id) ? tasks.task(id) : null;
+ final Task task = tasks.contains(id) ? tasks.task(id) : null;
// Closed and uninitialized tasks don't have any offsets so we
should read directly from the checkpoint
if (task != null && task.state() != State.CREATED && task.state()
!= State.CLOSED) {
final Map<TopicPartition, Long> changelogOffsets =
task.changelogOffsets();
@@ -969,7 +970,7 @@ public class TaskManager {
final TaskId id = parseTaskDirectoryName(dir.getName(),
namedTopology);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
- if (!tasks.owned(id)) {
+ if (!tasks.contains(id)) {
log.debug("Temporarily locked unassigned task {} for
the upcoming rebalance", id);
}
}
@@ -1002,7 +1003,7 @@ public class TaskManager {
final Iterator<TaskId> taskIdIterator =
lockedTaskDirectories.iterator();
while (taskIdIterator.hasNext()) {
final TaskId id = taskIdIterator.next();
- if (!tasks.owned(id)) {
+ if (!tasks.contains(id)) {
stateDirectory.unlock(id);
taskIdIterator.remove();
}
@@ -1575,7 +1576,7 @@ public class TaskManager {
tasks.addTask(task);
}
- Tasks tasks() {
+ TasksRegistry tasks() {
return tasks;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 8178fe36911..18cfd08bb6b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -40,7 +40,7 @@ import static org.apache.kafka.common.utils.Utils.union;
* i.e. all running active tasks are processed by the former and all restoring
active tasks and standby tasks are
* processed by the latter.
*/
-class Tasks {
+class Tasks implements TasksRegistry {
private final Logger log;
// TODO: convert to Stream/StandbyTask when we remove
TaskManager#StateMachineTask with mocks
@@ -66,12 +66,14 @@ class Tasks {
this.log = logContext.logger(getClass());
}
- void clearPendingTasksToCreate() {
+ @Override
+ public void clearPendingTasksToCreate() {
pendingActiveTasksToCreate.clear();
pendingStandbyTasksToCreate.clear();
}
- Map<TaskId, Set<TopicPartition>>
drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+ @Override
+ public Map<TaskId, Set<TopicPartition>>
drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {
final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies
=
filterMap(pendingActiveTasksToCreate, t ->
currentTopologies.contains(t.getKey().topologyName()));
@@ -80,7 +82,8 @@ class Tasks {
return pendingActiveTasksForTopologies;
}
- Map<TaskId, Set<TopicPartition>>
drainPendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
+ @Override
+ public Map<TaskId, Set<TopicPartition>>
drainPendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies
=
filterMap(pendingStandbyTasksToCreate, t ->
currentTopologies.contains(t.getKey().topologyName()));
@@ -89,56 +92,70 @@ class Tasks {
return pendingActiveTasksForTopologies;
}
- void addPendingActiveTasksToCreate(final Map<TaskId, Set<TopicPartition>>
pendingTasks) {
+ @Override
+ public void addPendingActiveTasksToCreate(final Map<TaskId,
Set<TopicPartition>> pendingTasks) {
pendingActiveTasksToCreate.putAll(pendingTasks);
}
- void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>>
pendingTasks) {
+ @Override
+ public void addPendingStandbyTasksToCreate(final Map<TaskId,
Set<TopicPartition>> pendingTasks) {
pendingStandbyTasksToCreate.putAll(pendingTasks);
}
- Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
+ @Override
+ public Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId)
{
return pendingTasksToRecycle.remove(taskId);
}
- void addPendingTaskToRecycle(final TaskId taskId, final
Set<TopicPartition> inputPartitions) {
+ @Override
+ public void addPendingTaskToRecycle(final TaskId taskId, final
Set<TopicPartition> inputPartitions) {
pendingTasksToRecycle.put(taskId, inputPartitions);
}
- Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId
taskId) {
+ @Override
+ public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final
TaskId taskId) {
return pendingTasksToUpdateInputPartitions.remove(taskId);
}
- void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final
Set<TopicPartition> inputPartitions) {
+ @Override
+ public void addPendingTaskToUpdateInputPartitions(final TaskId taskId,
final Set<TopicPartition> inputPartitions) {
pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
}
- boolean removePendingTaskToCloseDirty(final TaskId taskId) {
+ @Override
+ public boolean removePendingTaskToCloseDirty(final TaskId taskId) {
return pendingTasksToCloseDirty.remove(taskId);
}
- void addPendingTaskToCloseDirty(final TaskId taskId) {
+
+ @Override
+ public void addPendingTaskToCloseDirty(final TaskId taskId) {
pendingTasksToCloseDirty.add(taskId);
}
- boolean removePendingTaskToCloseClean(final TaskId taskId) {
+ @Override
+ public boolean removePendingTaskToCloseClean(final TaskId taskId) {
return pendingTasksToCloseClean.remove(taskId);
}
- void addPendingTaskToCloseClean(final TaskId taskId) {
+ @Override
+ public void addPendingTaskToCloseClean(final TaskId taskId) {
pendingTasksToCloseClean.add(taskId);
}
- Set<Task> drainPendingTaskToInit() {
+ @Override
+ public Set<Task> drainPendingTaskToInit() {
final Set<Task> result = new HashSet<>(pendingTasksToInit);
pendingTasksToInit.clear();
return result;
}
- void addPendingTaskToInit(final Collection<Task> tasks) {
+ @Override
+ public void addPendingTaskToInit(final Collection<Task> tasks) {
pendingTasksToInit.addAll(tasks);
}
- void addNewActiveTasks(final Collection<Task> newTasks) {
+ @Override
+ public void addNewActiveTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task activeTask : newTasks) {
final TaskId taskId = activeTask.id();
@@ -159,7 +176,8 @@ class Tasks {
}
}
- void addNewStandbyTasks(final Collection<Task> newTasks) {
+ @Override
+ public void addNewStandbyTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task standbyTask : newTasks) {
final TaskId taskId = standbyTask.id();
@@ -177,7 +195,8 @@ class Tasks {
}
}
- void removeTask(final Task taskToRemove) {
+ @Override
+ public void removeTask(final Task taskToRemove) {
final TaskId taskId = taskToRemove.id();
if (taskToRemove.state() != Task.State.CLOSED) {
@@ -196,7 +215,8 @@ class Tasks {
}
}
- void replaceActiveWithStandby(final StandbyTask standbyTask) {
+ @Override
+ public void replaceActiveWithStandby(final StandbyTask standbyTask) {
final TaskId taskId = standbyTask.id();
if (activeTasksPerId.remove(taskId) == null) {
throw new IllegalStateException("Attempted to replace unknown
active task with standby task: " + taskId);
@@ -206,7 +226,8 @@ class Tasks {
standbyTasksPerId.put(standbyTask.id(), standbyTask);
}
- void replaceStandbyWithActive(final StreamTask activeTask) {
+ @Override
+ public void replaceStandbyWithActive(final StreamTask activeTask) {
final TaskId taskId = activeTask.id();
if (standbyTasksPerId.remove(taskId) == null) {
throw new IllegalStateException("Attempted to convert unknown
standby task to stream task: " + taskId);
@@ -218,7 +239,8 @@ class Tasks {
}
}
- boolean updateActiveTaskInputPartitions(final Task task, final
Set<TopicPartition> topicPartitions) {
+ @Override
+ public boolean updateActiveTaskInputPartitions(final Task task, final
Set<TopicPartition> topicPartitions) {
final boolean requiresUpdate =
!task.inputPartitions().equals(topicPartitions);
if (requiresUpdate) {
log.debug("Update task {} inputPartitions: current {}, new {}",
task, task.inputPartitions(), topicPartitions);
@@ -243,14 +265,16 @@ class Tasks {
toBeRemoved.forEach(activeTasksPerPartition::remove);
}
- void clear() {
+ @Override
+ public void clear() {
activeTasksPerId.clear();
standbyTasksPerId.clear();
activeTasksPerPartition.clear();
}
// TODO: change return type to `StreamTask`
- Task activeTasksForInputPartition(final TopicPartition partition) {
+ @Override
+ public Task activeTasksForInputPartition(final TopicPartition partition) {
return activeTasksPerPartition.get(partition);
}
@@ -264,7 +288,8 @@ class Tasks {
return null;
}
- Task task(final TaskId taskId) {
+ @Override
+ public Task task(final TaskId taskId) {
final Task task = getTask(taskId);
if (task != null)
@@ -273,7 +298,8 @@ class Tasks {
throw new IllegalStateException("Task unknown: " + taskId);
}
- Collection<Task> tasks(final Collection<TaskId> taskIds) {
+ @Override
+ public Collection<Task> tasks(final Collection<TaskId> taskIds) {
final Set<Task> tasks = new HashSet<>();
for (final TaskId taskId : taskIds) {
tasks.add(task(taskId));
@@ -281,7 +307,8 @@ class Tasks {
return tasks;
}
- Collection<Task> activeTasks() {
+ @Override
+ public Collection<Task> activeTasks() {
return Collections.unmodifiableCollection(activeTasksPerId.values());
}
@@ -289,27 +316,32 @@ class Tasks {
* All tasks returned by any of the getters are read-only and should NOT
be modified;
* and the returned task could be modified by other threads concurrently
*/
- Set<Task> allTasks() {
+ @Override
+ public Set<Task> allTasks() {
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
- Set<TaskId> allTaskIds() {
+ @Override
+ public Set<TaskId> allTaskIds() {
return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
}
- Map<TaskId, Task> allTasksPerId() {
+ @Override
+ public Map<TaskId, Task> allTasksPerId() {
final Map<TaskId, Task> ret = new HashMap<>();
ret.putAll(activeTasksPerId);
ret.putAll(standbyTasksPerId);
return ret;
}
- boolean owned(final TaskId taskId) {
+ @Override
+ public boolean contains(final TaskId taskId) {
return getTask(taskId) != null;
}
// for testing only
- void addTask(final Task task) {
+ @Override
+ public void addTask(final Task task) {
if (task.isActive()) {
activeTasksPerId.put(task.id(), task);
} else {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
new file mode 100644
index 00000000000..d8efe334cf1
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public interface TasksRegistry {
+
+ Map<TaskId, Set<TopicPartition>>
drainPendingActiveTasksForTopologies(final Set<String> currentTopologies);
+
+ Map<TaskId, Set<TopicPartition>>
drainPendingStandbyTasksForTopologies(final Set<String> currentTopologies);
+
+ void addPendingActiveTasksToCreate(final Map<TaskId, Set<TopicPartition>>
pendingTasks);
+
+ void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>>
pendingTasks);
+
+ void clearPendingTasksToCreate();
+
+ Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId);
+
+ void addPendingTaskToRecycle(final TaskId taskId, final
Set<TopicPartition> inputPartitions);
+
+ Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId
taskId);
+
+ void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final
Set<TopicPartition> inputPartitions);
+
+ boolean removePendingTaskToCloseDirty(final TaskId taskId);
+
+ void addPendingTaskToCloseDirty(final TaskId taskId);
+
+ boolean removePendingTaskToCloseClean(final TaskId taskId);
+
+ void addPendingTaskToCloseClean(final TaskId taskId);
+
+ Set<Task> drainPendingTaskToInit();
+
+ void addPendingTaskToInit(final Collection<Task> tasks);
+
+ void addNewActiveTasks(final Collection<Task> newTasks);
+
+ void addNewStandbyTasks(final Collection<Task> newTasks);
+
+ void removeTask(final Task taskToRemove);
+
+ void replaceActiveWithStandby(final StandbyTask standbyTask);
+
+ void replaceStandbyWithActive(final StreamTask activeTask);
+
+ boolean updateActiveTaskInputPartitions(final Task task, final
Set<TopicPartition> topicPartitions);
+
+ void clear();
+
+ Task activeTasksForInputPartition(final TopicPartition partition);
+
+ Task task(final TaskId taskId);
+
+ Collection<Task> tasks(final Collection<TaskId> taskIds);
+
+ Collection<Task> activeTasks();
+
+ Set<Task> allTasks();
+
+ Map<TaskId, Task> allTasksPerId();
+
+ Set<TaskId> allTaskIds();
+
+ boolean contains(final TaskId taskId);
+
+ void addTask(final Task task);
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2bd1250b7ca..9edfbc10b37 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -754,6 +754,7 @@ public class StreamThreadTest {
null,
null,
null,
+ new Tasks(new LogContext()),
topologyMetadata,
null,
null,
@@ -857,6 +858,7 @@ public class StreamThreadTest {
null,
activeTaskCreator,
standbyTaskCreator,
+ new Tasks(new LogContext()),
topologyMetadata,
null,
null,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b65b3828533..aa112110af7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig;
@@ -202,6 +203,7 @@ public class TaskManagerTest {
"taskManagerTest",
activeTaskCreator,
standbyTaskCreator,
+ new Tasks(new LogContext()),
topologyMetadata,
adminClient,
stateDirectory,
@@ -540,7 +542,7 @@ public class TaskManagerTest {
"taskManagerTest",
activeTaskCreator,
standbyTaskCreator,
- topologyMetadata,
+ new Tasks(new LogContext()), topologyMetadata,
adminClient,
stateDirectory,
stateUpdater
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 6265fd4ca2a..e51d4ea0f1f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -70,9 +70,9 @@ public class TasksTest {
mkEntry(standbyTask.id(), standbyTask)
),
tasks.allTasksPerId());
- assertTrue(tasks.owned(statefulTask.id()));
- assertTrue(tasks.owned(statelessTask.id()));
- assertTrue(tasks.owned(statefulTask.id()));
+ assertTrue(tasks.contains(statefulTask.id()));
+ assertTrue(tasks.contains(statelessTask.id()));
+ assertTrue(tasks.contains(statefulTask.id()));
}
@Test