Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-05 Thread via GitHub


mjsax merged PR #14714:
URL: https://github.com/apache/kafka/pull/14714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-05 Thread via GitHub


mjsax commented on PR #14714:
URL: https://github.com/apache/kafka/pull/14714#issuecomment-1842147167

   Jenkins issues:
   - build 7: `JDK 11 and Scala 2.13` failed
   - build 6: `JDK 8 and Scala 2.12` failed
   
   Test failures seems to be unrelated to this PR. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-04 Thread via GitHub


lihaosky commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1414713123


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:
##
@@ -394,6 +394,106 @@ public void testDeterministic() {
 }
 }
 
+@Test
+public void testMaxFlowOnlySourceAndSink() {
+final Graph graph1 = new Graph<>();

Review Comment:
   `graph` was used in setup



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java:
##
@@ -88,55 +93,20 @@ public Graph constructTaskGraph(
 }
 }
 
-// TODO: validate tasks in tasksForTopicGroup and taskIdList
-final SortedMap> sortedTasksForTopicGroup = 
new TreeMap<>(tasksForTopicGroup);

Review Comment:
   It's put in `validateTasks`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-04 Thread via GitHub


mjsax commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1414683411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java:
##
@@ -97,6 +98,17 @@ private int getCost(final TaskId taskId, final UUID 
processId, final boolean inC
 return 1;
 }
 
+@Test
+public void testSubtopicShouldContainAllTasks() {

Review Comment:
   `Subtopology` instead of `Subtopic` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java:
##
@@ -88,55 +93,20 @@ public Graph constructTaskGraph(
 }
 }
 
-// TODO: validate tasks in tasksForTopicGroup and taskIdList
-final SortedMap> sortedTasksForTopicGroup = 
new TreeMap<>(tasksForTopicGroup);

Review Comment:
   Was this code only moved, or altered?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -676,18 +676,28 @@ static SortedMap> 
getTaskTopicPartitionMap(final int
 return taskTopicPartitionMap;
 }
 
-static Map configProps(final boolean 
enableRackAwareAssignor) {
-return configProps(enableRackAwareAssignor, 0);
+static Map> getTasksForTopicGroup(final int 
tpSize, final int partitionSize) {
+final Map> tasksForTopicGroup = new 
HashMap<>();
+for (int i = 0; i < tpSize; i++) {
+for (int j = 0; j < partitionSize; j++) {
+final Subtopology subtopology = new Subtopology(i, null);
+tasksForTopicGroup.computeIfAbsent(subtopology, k -> new 
HashSet<>()).add(new TaskId(i, j));
+}
+}
+return tasksForTopicGroup;
+}
+
+static Map configProps(final String rackAwareConfig) {
+return configProps(rackAwareConfig, 0);
 }
 
-static Map configProps(final boolean 
enableRackAwareAssignor, final int replicaNum) {
+static Map configProps(final String rackAwareConfig, final 
int replicaNum) {
 final Map configurationMap = new HashMap<>();
 configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
 configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
 configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
replicaNum);
-if (enableRackAwareAssignor) {
-
configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
-}
+// 
configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);

Review Comment:
   Needs some cleanup



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:
##
@@ -394,6 +394,106 @@ public void testDeterministic() {
 }
 }
 
+@Test
+public void testMaxFlowOnlySourceAndSink() {
+final Graph graph1 = new Graph<>();

Review Comment:
   Nit: Why `graph1` but not `graph`? (similar below)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTasksForTopicGroup;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import 

Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-11-29 Thread via GitHub


lihaosky commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1409723863


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java:
##
@@ -0,0 +1,186 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.GetCostFunction;
+
+public class BalanceSubtopologyGraphConstructor implements 
RackAwareGraphConstructor {
+
+private final Map> tasksForTopicGroup;
+
+public BalanceSubtopologyGraphConstructor(final Map> tasksForTopicGroup) {
+this.tasksForTopicGroup = tasksForTopicGroup;
+}
+
+@Override
+public int getSinkNodeID(final List taskIdList, final List 
clientList,
+final Map> tasksForTopicGroup) {
+return clientList.size() + taskIdList.size() + clientList.size() * 
tasksForTopicGroup.size();
+}
+
+
+@Override
+public int getClientNodeId(final int clientIndex, final List 
taskIdList, final List clientList, final int topicGroupIndex) {
+return taskIdList.size() + clientList.size() * topicGroupIndex + 
clientIndex;
+}
+
+@Override
+public int getClientIndex(final int clientNodeId, final List 
taskIdList, final List clientList, final int topicGroupIndex) {
+return clientNodeId - taskIdList.size() - clientList.size() * 
topicGroupIndex;
+}
+
+private static int getSecondStageClientNodeId(final List 
taskIdList, final List clientList, final Map> 
tasksForTopicGroup, final int clientIndex) {
+return taskIdList.size() + clientList.size() * 
tasksForTopicGroup.size() + clientIndex;
+}
+
+@Override
+public Graph constructTaskGraph(final List clientList,
+final List taskIdList, final SortedMap 
clientStates,
+final Map taskClientMap, final Map 
originalAssignedTaskNumber,
+final BiPredicate hasAssignedTask, final 
GetCostFunction getCostFunction, final int trafficCost,
+final int nonOverlapCost, final boolean hasReplica, final boolean 
isStandby) {
+final Graph graph = new Graph<>();
+
+for (final TaskId taskId : taskIdList) {
+for (final Entry clientState : 
clientStates.entrySet()) {
+if (hasAssignedTask.test(clientState.getValue(), taskId)) {
+originalAssignedTaskNumber.merge(clientState.getKey(), 1, 
Integer::sum);
+}
+}
+}
+
+// TODO: validate tasks in tasksForTopicGroup and taskIdList
+final SortedMap> sortedTasksForTopicGroup = 
new TreeMap<>(tasksForTopicGroup);
+final int sinkId = getSinkNodeID(taskIdList, clientList, 
tasksForTopicGroup);
+
+int taskNodeId = 0;
+int topicGroupIndex = 0;
+for (final Entry> kv : 
sortedTasksForTopicGroup.entrySet()) {
+final SortedSet taskIds = new TreeSet<>(kv.getValue());
+for (int clientIndex = 0; clientIndex < clientList.size(); 
clientIndex++) {
+final UUID processId = clientList.get(clientIndex);
+final int clientNodeId = getClientNodeId(clientIndex, 
taskIdList, clientList, topicGroupIndex);
+int startingTaskNodeId = taskNodeId;
+for (final TaskId taskId : taskIds) {
+final int flow = 
hasAssignedTask.test(clientStates.get(processId), taskId) ? 1 : 0;
+graph.addEdge(startingTaskNodeId, clientNodeId, 1, 
getCostFunction.getCost(taskId, processId, false, trafficCost, nonOverlapCost, 
isStandby), flow);
+graph.addEdge(SOURCE_ID, startingTaskNodeId, 1, 0, 0);
+startingTaskNodeId++;
+}
+
+final int secondStageClientNodeId = 
getSecondStageClientNodeId(taskIdList, clientList, tasksForTopicGroup, 
clientIndex);

Review Comment:
   Right, I initially put task in outer loop and found this issue. Will make it 
more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-11-28 Thread via GitHub


mjsax commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1408740752


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java:
##
@@ -0,0 +1,47 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.GetCostFunction;
+
+/**
+ * Construct graph for rack aware task assignor
+ */
+public interface RackAwareGraphConstructor {
+int SOURCE_ID = -1;
+
+int getSinkNodeID(final List taskIdList, final List 
clientList, final Map> tasksForTopicGroup);
+
+int getClientNodeId(final int clientIndex, final List taskIdList, 
final List clientList, final int topicGroupIndex);
+
+int getClientIndex(final int clientNodeId, final List taskIdList, 
final List clientList, final int topicGroupIndex);
+
+Graph constructTaskGraph(final List clientList,
+final List taskIdList,

Review Comment:
   nit: align indention (same below)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java:
##
@@ -0,0 +1,162 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.GetCostFunction;
+
+public class MinTrafficGraphConstructor implements RackAwareGraphConstructor {
+
+@Override
+public int getSinkNodeID(final List taskIdList, final List 
clientList,
+final Map> tasksForTopicGroup) {
+return clientList.size() + taskIdList.size();
+}
+
+@Override
+public int getClientNodeId(final int clientIndex, final List 
taskIdList, final List clientList, final int topicGroupIndex) {
+return clientIndex + taskIdList.size();
+}
+
+@Override
+public int getClientIndex(final int clientNodeId, final List 
taskIdList, final List clientList, final int topicGroupIndex) {
+return clientNodeId - taskIdList.size();
+}
+
+@Override
+public Graph constructTaskGraph(final List clientList,
+final List taskIdList, final SortedMap 
clientStates,
+final Map taskClientMap, final Map 
originalAssignedTaskNumber,
+final BiPredicate hasAssignedTask, final 
GetCostFunction getCostFunction, final int trafficCost,
+final int nonOverlapCost, final boolean hasReplica, final boolean 
isStandby) {
+
+final Graph graph = new Graph<>();
+
+for (final TaskId taskId : taskIdList) {
+for (final Entry clientState : 
clientStates.entrySet()) {
+if (hasAssignedTask.test(clientState.getValue(), taskId)) {
+originalAssignedTaskNumber.merge(clientState.getKey(), 1, 
Integer::sum);
+}
+}
+}
+
+// Make task and client Node id in graph deterministic
+for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) 
{
+final TaskId taskId = taskIdList.get(taskNodeId);
+for (int j = 0; j < clientList.size(); j++) {
+final int clientNodeId = getClientNodeId(j, taskIdList, 
clientList, 0);

Review Comment:
   Given that this is min-cost case, so we need to pass `clientList` or could 
we pass `null` (similar to calling `getSinkNodeID(...)` with `null` below) ?
   
   Should we pass `0` or `-1` to highly "unused" better?
   
   Might be worth to add a comment?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java:
##
@@ -0,0 +1,162 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 

[PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-11-07 Thread via GitHub


lihaosky opened a new pull request, #14714:
URL: https://github.com/apache/kafka/pull/14714

   ### Description
   Refactor graph construction and assignment in `RackAwareAssignor` to new 
interface. Will add implementation for subtopology case and unit test later
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org