This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 87264e67143 KAFKA-15045: (KIP-924 pt. 23) More TaskAssignmentUtils
tests (#16292)
87264e67143 is described below
commit 87264e671433c7632c1cc6872289ad77a740d9f1
Author: Antoine Pourchet <[email protected]>
AuthorDate: Wed Jun 12 15:25:47 2024 -0600
KAFKA-15045: (KIP-924 pt. 23) More TaskAssignmentUtils tests (#16292)
Also moved the assignment validation test from StreamsPartitionAssignorTest
to TaskAssignmentUtilsTest.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../processor/assignment/AssignmentConfigs.java | 24 +--
.../assignment/KafkaStreamsAssignment.java | 16 ++
.../streams/processor/assignment/ProcessId.java | 8 +
.../processor/assignment/TaskAssignmentUtils.java | 8 +-
.../assignment/assignors/StickyTaskAssignor.java | 23 +-
.../internals/StreamsPartitionAssignor.java | 3 +-
.../internals/StreamsPartitionAssignorTest.java | 126 -----------
.../processor/internals/TaskManagerTest.java | 3 +-
.../assignment/KafkaStreamsAssignmentTest.java | 49 +++++
.../assignment/KafkaStreamsStateTest.java | 5 +-
.../assignment/TaskAssignmentUtilsTest.java | 233 ++++++++++++++++++++-
11 files changed, 333 insertions(+), 165 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
index 6a7ca68a50f..abd9d50b8f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java
@@ -20,8 +20,6 @@ import java.util.List;
import java.util.OptionalInt;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.streams.StreamsConfig;
-import
org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;
-import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
/**
* Assignment related configs for the Kafka Streams {@link TaskAssignor}.
@@ -43,26 +41,8 @@ public class AssignmentConfigs {
final long probingRebalanceIntervalMs =
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
final List<String> rackAwareAssignmentTags =
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
final String rackAwareAssignmentStrategy =
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
- Integer rackAwareTrafficCost =
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
- Integer rackAwareNonOverlapCost =
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
-
- final String assignorClassName =
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
- if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
- if (rackAwareTrafficCost == null) {
- rackAwareTrafficCost =
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
- }
- if (rackAwareNonOverlapCost == null) {
- rackAwareNonOverlapCost =
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
- }
- } else if
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
- // TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor
class once it implements the new TaskAssignor interface
- if (rackAwareTrafficCost == null) {
- rackAwareTrafficCost =
HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST;
- }
- if (rackAwareNonOverlapCost == null) {
- rackAwareNonOverlapCost =
HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST;
- }
- }
+ final Integer rackAwareTrafficCost =
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+ final Integer rackAwareNonOverlapCost =
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
return new AssignmentConfigs(
acceptableRecoveryLag,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
index f5205c8422b..848219d8c72 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.assignment;
import static java.util.Collections.unmodifiableMap;
import java.time.Instant;
+import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -108,6 +109,16 @@ public class KafkaStreamsAssignment {
return followupRebalanceDeadline;
}
+ @Override
+ public String toString() {
+ return String.format(
+ "KafkaStreamsAssignment{%s, %s, %s}",
+ processId,
+ Arrays.toString(tasks.values().toArray(new AssignedTask[0])),
+ followupRebalanceDeadline
+ );
+ }
+
public static class AssignedTask {
private final TaskId id;
private final Type taskType;
@@ -157,5 +168,10 @@ public class KafkaStreamsAssignment {
final AssignedTask other = (AssignedTask) obj;
return this.id.equals(other.id()) && this.taskType ==
other.taskType;
}
+
+ @Override
+ public String toString() {
+ return String.format("AssignedTask{%s, %s}", taskType, id);
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
index 9dd4025112a..0a3c2c2bfb4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
@@ -35,6 +35,14 @@ public class ProcessId implements Comparable<ProcessId> {
return id;
}
+ /**
+ *
+ * @return a randomly generated process id
+ */
+ public static ProcessId randomProcessId() {
+ return new ProcessId(UUID.randomUUID());
+ }
+
@Override
public String toString() {
return id.toString();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index 39a698adfa5..9af8a10cbd6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -59,16 +59,16 @@ public final class TaskAssignmentUtils {
* A simple config container for necessary parameters and optional
overrides to apply when
* running the active or standby task rack-aware optimizations.
*/
- public static class RackAwareOptimizationParams {
+ public static final class RackAwareOptimizationParams {
private final ApplicationState applicationState;
private final Optional<Integer> trafficCostOverride;
private final Optional<Integer> nonOverlapCostOverride;
private final Optional<SortedSet<TaskId>> tasksToOptimize;
private RackAwareOptimizationParams(final ApplicationState
applicationState,
- final Optional<Integer>
trafficCostOverride,
- final Optional<Integer>
nonOverlapCostOverride,
- final Optional<SortedSet<TaskId>>
tasksToOptimize) {
+ final Optional<Integer>
trafficCostOverride,
+ final Optional<Integer>
nonOverlapCostOverride,
+ final Optional<SortedSet<TaskId>>
tasksToOptimize) {
this.applicationState = applicationState;
this.trafficCostOverride = trafficCostOverride;
this.nonOverlapCostOverride = nonOverlapCostOverride;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
index 3d5e5b247f7..fe01b502a22 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
@@ -95,10 +95,15 @@ public class StickyTaskAssignor implements TaskAssignor {
final Map<ProcessId, KafkaStreamsAssignment> currentAssignments =
assignmentState.newAssignments;
- TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-
RackAwareOptimizationParams.of(applicationState).forStatefulTasks(),
- currentAssignments
- );
+ final RackAwareOptimizationParams statefulTaskParams =
RackAwareOptimizationParams.of(applicationState)
+ .withTrafficCostOverride(
+
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
+ )
+ .withNonOverlapCostOverride(
+
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
+ )
+ .forStatefulTasks();
+ TaskAssignmentUtils.optimizeRackAwareActiveTasks(statefulTaskParams,
currentAssignments);
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
RackAwareOptimizationParams.of(applicationState)
@@ -120,7 +125,15 @@ public class StickyTaskAssignor implements TaskAssignor {
}
final Map<ProcessId, KafkaStreamsAssignment> assignments =
assignmentState.newAssignments;
-
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
assignments);
+
+ final RackAwareOptimizationParams optimizationParams =
RackAwareOptimizationParams.of(applicationState)
+ .withTrafficCostOverride(
+
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
+ )
+ .withNonOverlapCostOverride(
+
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
+ );
+ TaskAssignmentUtils.optimizeRackAwareStandbyTasks(optimizationParams,
assignments);
assignmentState.processOptimizedAssignments(assignments);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 38b164f1969..887ef86faf5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -93,7 +93,6 @@ import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableSet;
import static java.util.Map.Entry.comparingByKey;
-import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.filterMap;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsResult;
@@ -197,7 +196,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
// keep track of any future consumers in a "dummy" Client since we can't
decipher their subscription
- private static final ProcessId FUTURE_ID = new ProcessId(randomUUID());
+ private static final ProcessId FUTURE_ID = ProcessId.randomProcessId();
protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9c7338f748d..43c07ee3c28 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
import java.util.Arrays;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
@@ -60,20 +59,12 @@ import
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.assignment.ApplicationState;
-import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
-import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.ProcessId;
-import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
-import org.apache.kafka.streams.processor.assignment.TaskInfo;
import
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import
org.apache.kafka.streams.processor.internals.assignment.DefaultApplicationState;
-import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
-import
org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
@@ -2616,123 +2607,6 @@ public class StreamsPartitionAssignorTest {
assertEquals(clientTags, partitionAssignor.clientTags());
}
- @Test
- public void testValidateTaskAssignment() {
- createDefaultMockTaskManager();
- configureDefaultPartitionAssignor();
-
- final StreamsConfig streamsConfig = new StreamsConfig(configProps());
- final AssignmentConfigs assignmentConfigs =
AssignmentConfigs.of(streamsConfig);
- final Set<TaskInfo> tasks = mkSet(
- new DefaultTaskInfo(
- new TaskId(1, 1),
- false,
- mkSet(),
- mkSet(
- new DefaultTaskTopicPartition(
- new TopicPartition("t1", 1),
- true,
- false,
- () -> { }
- )
- )
- )
- );
-
- final ProcessId clientUuid1 = new ProcessId(UUID.randomUUID());
- final ProcessId clientUuid2 = new ProcessId(UUID.randomUUID());
- final Map<ProcessId, StreamsPartitionAssignor.ClientMetadata> clients
= mkMap(
- mkEntry(clientUuid1, new
StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint1:80", mkMap(),
Optional.empty())),
- mkEntry(clientUuid2, new
StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint2:80", mkMap(),
Optional.empty()))
- );
- final ApplicationState applicationState = new DefaultApplicationState(
- assignmentConfigs,
- tasks.stream().collect(Collectors.toMap(
- TaskInfo::id,
- t -> t
- )),
- clients
- );
-
- // ****
- final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
noError = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
- mkSet(
- KafkaStreamsAssignment.of(clientUuid1, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- )),
- KafkaStreamsAssignment.of(clientUuid2, mkSet())
- )
- );
-
org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError);
-
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.NONE,
error);
-
- // ****
- final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
missingProcessId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
- mkSet(
- KafkaStreamsAssignment.of(clientUuid1, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- ))
- )
- );
- error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
missingProcessId);
-
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.MISSING_PROCESS_ID,
error);
-
- // ****
- final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
unknownProcessId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
- mkSet(
- KafkaStreamsAssignment.of(clientUuid1, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- )),
- KafkaStreamsAssignment.of(clientUuid2, mkSet()),
- KafkaStreamsAssignment.of(new ProcessId(UUID.randomUUID()),
mkSet())
- )
- );
- error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
unknownProcessId);
-
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID,
error);
-
- // ****
- final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
unknownTaskId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
- mkSet(
- KafkaStreamsAssignment.of(clientUuid1, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- )),
- KafkaStreamsAssignment.of(clientUuid2, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(13, 13),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- ))
- )
- );
- error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
unknownTaskId);
-
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_TASK_ID,
error);
-
- // ****
- final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
activeTaskDuplicated = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
- mkSet(
- KafkaStreamsAssignment.of(clientUuid1, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- )),
- KafkaStreamsAssignment.of(clientUuid2, mkSet(
- new KafkaStreamsAssignment.AssignedTask(
- new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
- )
- ))
- )
- );
- error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
activeTaskDuplicated);
-
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
error);
- }
-
private static class CorruptedInternalTopologyBuilder extends
InternalTopologyBuilder {
private Map<Subtopology, TopicsInfo> corruptedTopicGroups;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c6f568466e5..22fd4052b34 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.UUID;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
@@ -240,7 +239,7 @@ public class TaskManagerTest {
final TaskManager taskManager = new TaskManager(
time,
changeLogReader,
- new ProcessId(UUID.randomUUID()),
+ ProcessId.randomProcessId(),
"taskManagerTest",
activeTaskCreator,
standbyTaskCreator,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
new file mode 100644
index 00000000000..ef595d142b6
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.junit.Test;
+
+public class KafkaStreamsAssignmentTest {
+ @Test
+ public void shouldHaveReadableString() {
+ final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(
+ processIdForInt(1),
+ mkSet(
+ new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE),
+ new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY),
+ new AssignedTask(TASK_0_2, AssignedTask.Type.ACTIVE)
+ )
+ );
+
+ assertThat(
+ assignment.toString(),
+
equalTo("KafkaStreamsAssignment{00000000-0000-0000-0000-000000000001, "
+ + "[AssignedTask{ACTIVE, 0_2}, AssignedTask{STANDBY, 0_1},
AssignedTask{ACTIVE, 0_0}], "
+ + "Optional.empty}"));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
index a0b0c457a3b..b28cd3678a1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.UUID;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.junit.Test;
@@ -40,7 +39,7 @@ public class KafkaStreamsStateTest {
@Test
public void shouldCorrectlyReturnTasksByLag() {
final KafkaStreamsState state = new DefaultKafkaStreamsState(
- new ProcessId(UUID.randomUUID()),
+ ProcessId.randomProcessId(),
10,
mkMap(),
mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
@@ -71,7 +70,7 @@ public class KafkaStreamsStateTest {
@Test
public void shouldThrowExceptionOnLagOperationsIfLagsWereNotComputed() {
final KafkaStreamsState state = new DefaultKafkaStreamsState(
- new ProcessId(UUID.randomUUID()),
+ ProcessId.randomProcessId(),
10,
mkMap(),
mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
index 8eff5812284..2295a865aa7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
@@ -25,10 +25,12 @@ import static
org.apache.kafka.streams.processor.internals.assignment.Assignment
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_4;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_5;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -50,11 +52,12 @@ import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.rules.Timeout;
@@ -208,6 +211,222 @@ public class TaskAssignmentUtilsTest {
assertThat(assignments.get(processId(5)).tasks().keySet(),
equalTo(mkSet(TASK_0_0)));
}
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY,
+ })
+ public void
shouldNotViolateClientTagsAssignmentDuringStandbyOptimization(final String
strategy) {
+ final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+ strategy, 100, 1, 2, Collections.singletonList("az"));
+ final Map<TaskId, TaskInfo> tasks = mkMap(
+ mkTaskInfo(TASK_0_0, true, mkSet("r1")),
+ mkTaskInfo(TASK_0_1, true, mkSet("r1"))
+ );
+ final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+ mkStreamState(1, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+ mkEntry("az", "1")
+ )),
+ mkStreamState(2, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+ mkEntry("az", "2")
+ )),
+ mkStreamState(3, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+ mkEntry("az", "3")
+ )),
+ mkStreamState(4, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap(
+ mkEntry("az", "2")
+ ))
+ );
+ final ApplicationState applicationState = new TestApplicationState(
+ assignmentConfigs, kafkaStreamsStates, tasks);
+
+ final Map<ProcessId, KafkaStreamsAssignment> assignments = mkMap(
+ mkAssignment(
+ 1,
+ new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE),
+ new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY)
+ ),
+ mkAssignment(
+ 2,
+ new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY),
+ new AssignedTask(TASK_0_1, AssignedTask.Type.ACTIVE)
+ ),
+ mkAssignment(
+ 3,
+ new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY),
+ new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY)
+ ),
+ mkAssignment(4)
+ );
+
+
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
assignments);
+ assertThat(assignments.size(), equalTo(4));
+ assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+ assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+ assertThat(assignments.get(processId(3)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1)));
+ assertThat(assignments.get(processId(4)).tasks().keySet(),
equalTo(mkSet()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY,
+ })
+ public void shouldOptimizeStandbyTasksWithMultipleRacks(final String
strategy) {
+ final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+ strategy, 100, 1, 1, Collections.emptyList());
+ final Map<TaskId, TaskInfo> tasks = mkMap(
+ mkTaskInfo(TASK_0_0, true, mkSet("rack-1", "rack-2")),
+ mkTaskInfo(TASK_0_1, true, mkSet("rack-2", "rack-3")),
+ mkTaskInfo(TASK_0_2, true, mkSet("rack-3", "rack-4"))
+ );
+ final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+ mkStreamState(1, 2, Optional.of("rack-1")),
+ mkStreamState(2, 2, Optional.of("rack-2")),
+ mkStreamState(3, 2, Optional.of("rack-3"))
+ );
+ final ApplicationState applicationState = new TestApplicationState(
+ assignmentConfigs, kafkaStreamsStates, tasks);
+
+ final Map<ProcessId, KafkaStreamsAssignment> assignments = mkMap(
+ mkAssignment(AssignedTask.Type.ACTIVE, 1, TASK_0_0),
+ mkAssignment(AssignedTask.Type.ACTIVE, 2, TASK_0_1),
+ mkAssignment(AssignedTask.Type.ACTIVE, 3, TASK_0_2)
+ );
+
+ TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+ RackAwareOptimizationParams.of(applicationState)
+ .forTasks(new TreeSet<>(mkSet(TASK_0_0, TASK_0_1, TASK_0_2))),
+ assignments
+ );
+ assertThat(assignments.size(), equalTo(3));
+ assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_0)));
+ assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_1)));
+ assertThat(assignments.get(processId(3)).tasks().keySet(),
equalTo(mkSet(TASK_0_2)));
+ }
+
+ @Test
+ public void shouldCorrectlyReturnIdentityAssignment() {
+ final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1,
Collections.emptyList());
+ final Map<TaskId, TaskInfo> tasks = mkMap(
+ mkTaskInfo(TASK_0_0, true),
+ mkTaskInfo(TASK_0_1, true),
+ mkTaskInfo(TASK_0_2, true)
+ );
+ final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+ mkStreamState(1, 5, Optional.empty(), mkSet(TASK_0_0, TASK_0_1,
TASK_0_2), mkSet()),
+ mkStreamState(2, 5, Optional.empty(), mkSet(), mkSet(TASK_0_0,
TASK_0_1, TASK_0_2)),
+ mkStreamState(3, 5, Optional.empty(), mkSet(), mkSet()),
+ mkStreamState(4, 5, Optional.empty(), mkSet(), mkSet()),
+ mkStreamState(5, 5, Optional.empty(), mkSet(), mkSet())
+ );
+ final ApplicationState applicationState = new TestApplicationState(
+ assignmentConfigs, kafkaStreamsStates, tasks);
+
+
+ final Map<ProcessId, KafkaStreamsAssignment> assignments =
TaskAssignmentUtils.identityAssignment(applicationState);
+ assertThat(assignments.size(), equalTo(5));
+ assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
+ assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
+ assertThat(assignments.get(processId(3)).tasks().keySet(),
equalTo(mkSet()));
+ assertThat(assignments.get(processId(4)).tasks().keySet(),
equalTo(mkSet()));
+ assertThat(assignments.get(processId(5)).tasks().keySet(),
equalTo(mkSet()));
+ }
+
+ @Test
+ public void testValidateTaskAssignment() {
+ final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs(
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1,
Collections.emptyList());
+ final Map<TaskId, TaskInfo> tasks = mkMap(
+ mkTaskInfo(TASK_1_1, false)
+ );
+ final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = mkMap(
+ mkStreamState(1, 5, Optional.empty()),
+ mkStreamState(2, 5, Optional.empty())
+ );
+ final ApplicationState applicationState = new TestApplicationState(
+ assignmentConfigs, kafkaStreamsStates, tasks);
+
+ // ****
+ final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
noError = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+ mkSet(
+ KafkaStreamsAssignment.of(processId(1), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ )),
+ KafkaStreamsAssignment.of(processId(2), mkSet())
+ )
+ );
+
org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError);
+ assertThat(error, equalTo(TaskAssignor.AssignmentError.NONE));
+
+ // ****
+ final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
missingProcessId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+ mkSet(
+ KafkaStreamsAssignment.of(processId(1), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ ))
+ )
+ );
+ error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
missingProcessId);
+ assertThat(error,
equalTo(TaskAssignor.AssignmentError.MISSING_PROCESS_ID));
+
+ // ****
+ final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
unknownProcessId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+ mkSet(
+ KafkaStreamsAssignment.of(processId(1), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ )),
+ KafkaStreamsAssignment.of(processId(2), mkSet()),
+ KafkaStreamsAssignment.of(ProcessId.randomProcessId(), mkSet())
+ )
+ );
+ error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
unknownProcessId);
+ assertThat(error,
equalTo(TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID));
+
+ // ****
+ final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
unknownTaskId = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+ mkSet(
+ KafkaStreamsAssignment.of(processId(1), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ )),
+ KafkaStreamsAssignment.of(processId(2), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(13, 13),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ ))
+ )
+ );
+ error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
unknownTaskId);
+ assertThat(error,
equalTo(TaskAssignor.AssignmentError.UNKNOWN_TASK_ID));
+
+ // ****
+ final
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment
activeTaskDuplicated = new
org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
+ mkSet(
+ KafkaStreamsAssignment.of(processId(1), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ )),
+ KafkaStreamsAssignment.of(processId(2), mkSet(
+ new KafkaStreamsAssignment.AssignedTask(
+ new TaskId(1, 1),
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
+ )
+ ))
+ )
+ );
+ error = TaskAssignmentUtils.validateTaskAssignment(applicationState,
activeTaskDuplicated);
+ assertThat(error,
equalTo(TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES));
+ }
+
public static class TestApplicationState implements ApplicationState {
private final AssignmentConfigs assignmentConfigs;
@@ -293,6 +512,18 @@ public class TaskAssignmentUtilsTest {
);
}
+ public static Map.Entry<ProcessId, KafkaStreamsAssignment>
mkAssignment(final int client,
+
final AssignedTask... tasks) {
+ final ProcessId processId = processId(client);
+ return mkEntry(
+ processId,
+ KafkaStreamsAssignment.of(
+ processId,
+ Arrays.stream(tasks).collect(Collectors.toSet())
+ )
+ );
+ }
+
public static Map.Entry<TaskId, TaskInfo> mkTaskInfo(final TaskId taskId,
final boolean isStateful) {
return mkTaskInfo(taskId, isStateful, null);
}