mjsax commented on code in PR #21365:
URL: https://github.com/apache/kafka/pull/21365#discussion_r2765781911
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
activeTasks.addAll(tasks.activeTasks());
+ final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
Review Comment:
nit: should this be `StrandbyTask` type (compare comment above about
`StreamTask` type -- we would also just update the comment, and try to do a
follow up PR to improve types later?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
activeTasks.addAll(tasks.activeTasks());
+ final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ standbyTasks.addAll(tasks.standbyTasks());
+
+ Set<Task> pendingActiveTasks = tasks.drainPendingActiveTasksToInit();
Review Comment:
Seems this should be `final` -- wondering why the build did not fail with a
checktsyle error? Can we check this?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
Review Comment:
Not sure why this test does not verify the locking for `taskId01` ?
Or is the test setup with two task just not necessary and using one task is
sufficient?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3667,6 +3663,35 @@ public void
shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
+ @Test
+ public void shouldShutDownPendingTasksToInit() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ final StandbyTask standbyTask00 = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+
+ final StreamTask activeTask01 = statefulTask(taskId01,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.CloseOptions;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RebalanceTaskClosureIntegrationTest {
+
+ private static final int NUM_BROKERS = 1;
+ protected static final String INPUT_TOPIC_NAME = "input-topic";
+ private static final int NUM_PARTITIONS = 3;
+
+ private final EmbeddedKafkaCluster cluster = new
EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private KafkaStreamsWrapper streams1;
+ private KafkaStreamsWrapper streams2;
+ private String safeTestName;
+
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws InterruptedException,
IOException {
+ cluster.start();
+ cluster.createTopic(INPUT_TOPIC_NAME, NUM_PARTITIONS, 1);
+ safeTestName = safeUniqueTestName(testInfo);
+ }
+
+ @AfterEach
+ public void after() {
+ cluster.stop();
+ if (streams1 != null) {
+ streams1.close(Duration.ofSeconds(30));
+ }
+ if (streams2 != null) {
+ streams2.close(Duration.ofSeconds(30));
+ }
+ }
+
+ /**
+ * The conditions that we need to meet:
+ * <p><ul>
+ * <li>There is a task with an open store in {@link
org.apache.kafka.streams.processor.internals.TasksRegistry#pendingTasksToInit}</li>
+ * <li>StreamThread gets into PENDING_SHUTDOWN state, so that {@link
StreamThread#isStartingRunningOrPartitionAssigned} return false
Review Comment:
```suggestion
* <li>StreamThread gets into PENDING_SHUTDOWN state, so that {@link
StreamThread#isStartingRunningOrPartitionAssigned} returns false
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java:
##########
@@ -230,4 +249,18 @@ public void shouldClearAllPendingTasks() {
assertTrue(tasks.pendingActiveTasksToCreate().isEmpty());
assertTrue(tasks.pendingStandbyTasksToCreate().isEmpty());
}
+
+ @Test
+ public void shouldRemovePendingTaskToClose() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
Set.of(TOPIC_PARTITION_B_0))
+ .inState(State.SUSPENDED).build();
+ tasks.addPendingTasksToClose(List.of(activeTask1));
+
+ tasks.removeTask(activeTask1);
+ assertFalse(tasks.pendingTasksToInit().contains(activeTask1));
+ assertFalse(tasks.allTasks().contains(activeTask1));
+
+ tasks.addPendingTasksToClose(List.of(activeTask1));
+ assertTrue(tasks.pendingTasksToClose().contains(activeTask1));
Review Comment:
Why do we add it back and verify? Could we not do the verification above,
right after the added the task the first time?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -312,12 +312,8 @@ public void shouldLockTasksOnClose() {
final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
- final StreamTask activeTask2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
Review Comment:
We remove this, because `allTasks()` is not called any longer?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
activeTasks.addAll(tasks.activeTasks());
+ final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ standbyTasks.addAll(tasks.standbyTasks());
+
+ Set<Task> pendingActiveTasks = tasks.drainPendingActiveTasksToInit();
+ activeTasks.addAll(pendingActiveTasks);
+ tasks.addPendingTasksToClose(pendingActiveTasks);
+ Set<Task> pendingStandbyTasks = tasks.drainPendingStandbyTasksToInit();
Review Comment:
Same. Should be `final`
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3667,6 +3663,35 @@ public void
shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
+ @Test
+ public void shouldShutDownPendingTasksToInit() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ final StandbyTask standbyTask00 = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+
+ final StreamTask activeTask01 = statefulTask(taskId01,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+
+
when(tasks.drainPendingStandbyTasksToInit()).thenReturn(Set.of(standbyTask00));
+
when(tasks.drainPendingActiveTasksToInit()).thenReturn(Set.of(activeTask01));
+
+ taskManager.shutdown(true);
+
+ verify(standbyTask00).prepareCommit(true);
+ verify(standbyTask00).postCommit(true);
+ verify(standbyTask00).suspend();
+ verify(standbyTask00).closeClean();
+
+ verify(activeTask01).prepareCommit(true);
+ verify(activeTask01).suspend();
Review Comment:
Why is there no `postCommit()` for the active task?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -139,6 +154,21 @@ public boolean hasPendingTasksToInit() {
return !pendingTasksToInit.isEmpty();
}
+ @Override
+ public Set<Task> pendingTasksToClose() {
+ return Collections.unmodifiableSet(pendingTasksToClose);
+ }
+
+ @Override
+ public void addPendingTasksToClose(Collection<Task> tasks) {
Review Comment:
Parameter should be `final`
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3667,6 +3663,35 @@ public void
shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
+ @Test
+ public void shouldShutDownPendingTasksToInit() {
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ final StandbyTask standbyTask00 = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
Review Comment:
Should the state be `CREATED`?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3667,6 +3663,35 @@ public void
shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
+ @Test
+ public void shouldShutDownPendingTasksToInit() {
Review Comment:
```suggestion
public void shouldClosePendingTasksToInitDuringShutdown() {
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -312,12 +312,8 @@ public void shouldLockTasksOnClose() {
final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
- final StreamTask activeTask2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
Review Comment:
Why are we removing this task from the test? Is using a single task
sufficient (and it's unclear why we did use two tasks to begin with)?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java:
##########
@@ -164,6 +166,23 @@ public void
shouldVerifyIfPendingActiveTaskToInitAreDrained() {
assertTrue(tasks.pendingTasksToInit().containsAll(Set.of(standbyTask1,
standbyTask2)));
}
+ @Test
+ public void shouldVerifyIfPendingStandbyTaskToInitAreDrained() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
Set.of(TOPIC_PARTITION_B_0)).build();
+ final StreamTask activeTask2 = statefulTask(TASK_0_1,
Set.of(TOPIC_PARTITION_B_1)).build();
+ final StandbyTask standbyTask1 = standbyTask(TASK_1_0,
Set.of(TOPIC_PARTITION_A_0)).build();
+ final StandbyTask standbyTask2 = standbyTask(TASK_1_1,
Set.of(TOPIC_PARTITION_A_1)).build();
+ tasks.addPendingTasksToInit(Set.of(activeTask1, activeTask2,
standbyTask1, standbyTask2));
+
+ final Set<Task> standbyTasksToInit =
tasks.drainPendingStandbyTasksToInit();
+ assertEquals(2, standbyTasksToInit.size());
Review Comment:
```suggestion
assertEquals(2, standbyTasksToInit.size());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]