mjsax commented on code in PR #14108:
URL: https://github.com/apache/kafka/pull/14108#discussion_r1282569306


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -715,23 +1087,49 @@ private Map<UUID, Map<String, Optional<String>>> 
getRandomProcessRacks(final int
         return processRacks;
     }
 
-    private SortedMap<TaskId, Set<TopicPartition>> 
getTaskTopicPartitionMap(final int tpSize) {
+    private SortedMap<TaskId, Set<TopicPartition>> 
getTaskTopicPartitionMap(final int tpSize, final boolean changelog) {
         final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = 
new TreeMap<>();
+        final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : 
TOPIC_PREFIX;
         for (int i = 0; i < tpSize; i++) {
-            taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(new 
TopicPartition("topic" + i, 0)));
+            taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(
+                new TopicPartition(topicName + i, 0),
+                new TopicPartition(topicName + (i + 1) % tpSize, 0)
+            ));
         }
         return taskTopicPartitionMap;
     }
 
-    private SortedMap<UUID, ClientState> getRandomClientState(final int 
clientSize, final int tpSize) {
+    private InternalTopicManager 
mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int 
tpSize) {
+        final Set<String> changelogNames = new HashSet<>();
+        final List<Node> nodeList = getRandomNodes(nodeSize);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new 
HashMap<>();
+        for (int i = 0; i < tpSize; i++) {
+            final String topicName = CHANGELOG_TOPIC_PREFIX + i;
+            changelogNames.add(topicName);
+
+            final Node firstNode = nodeList.get(i % nodeSize);
+            final Node secondNode = nodeList.get((i + 1) % nodeSize);
+            final TopicPartitionInfo info = new TopicPartitionInfo(0, 
firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList());
+
+            topicPartitionInfo.computeIfAbsent(topicName, tp -> new 
ArrayList<>()).add(info);
+        }
+
+        final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
+        
doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames);
+        return spyTopicManager;
+    }
+
+    private SortedMap<UUID, ClientState> getRandomClientState(final int 
clientSize, final int tpSize, final int maxCapacity) {
         final SortedMap<UUID, ClientState> clientStates = new TreeMap<>();
         final List<TaskId> taskIds = new ArrayList<>(tpSize);
         for (int i = 0; i < tpSize; i++) {
             taskIds.add(new TaskId(i, 0));
         }
         Collections.shuffle(taskIds);
+        final Random random = new Random();

Review Comment:
   We should generate our own seed and print/log the seed -- in case a test 
fails, we can use the seed to reproduce the issue.
   ```
   final long seed = System.currentTimeMillis();
   log.debug("seed: " + seed);
    final Random random = new Random(seed);
    ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -715,23 +1087,49 @@ private Map<UUID, Map<String, Optional<String>>> 
getRandomProcessRacks(final int
         return processRacks;
     }
 
-    private SortedMap<TaskId, Set<TopicPartition>> 
getTaskTopicPartitionMap(final int tpSize) {
+    private SortedMap<TaskId, Set<TopicPartition>> 
getTaskTopicPartitionMap(final int tpSize, final boolean changelog) {
         final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = 
new TreeMap<>();
+        final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : 
TOPIC_PREFIX;
         for (int i = 0; i < tpSize; i++) {
-            taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(new 
TopicPartition("topic" + i, 0)));
+            taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(
+                new TopicPartition(topicName + i, 0),
+                new TopicPartition(topicName + (i + 1) % tpSize, 0)
+            ));
         }
         return taskTopicPartitionMap;
     }
 
-    private SortedMap<UUID, ClientState> getRandomClientState(final int 
clientSize, final int tpSize) {
+    private InternalTopicManager 
mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int 
tpSize) {
+        final Set<String> changelogNames = new HashSet<>();
+        final List<Node> nodeList = getRandomNodes(nodeSize);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new 
HashMap<>();
+        for (int i = 0; i < tpSize; i++) {
+            final String topicName = CHANGELOG_TOPIC_PREFIX + i;
+            changelogNames.add(topicName);
+
+            final Node firstNode = nodeList.get(i % nodeSize);
+            final Node secondNode = nodeList.get((i + 1) % nodeSize);
+            final TopicPartitionInfo info = new TopicPartitionInfo(0, 
firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList());
+
+            topicPartitionInfo.computeIfAbsent(topicName, tp -> new 
ArrayList<>()).add(info);
+        }
+
+        final MockInternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
+        
doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames);
+        return spyTopicManager;
+    }
+
+    private SortedMap<UUID, ClientState> getRandomClientState(final int 
clientSize, final int tpSize, final int maxCapacity) {
         final SortedMap<UUID, ClientState> clientStates = new TreeMap<>();
         final List<TaskId> taskIds = new ArrayList<>(tpSize);
         for (int i = 0; i < tpSize; i++) {
             taskIds.add(new TaskId(i, 0));
         }
         Collections.shuffle(taskIds);
+        final Random random = new Random();

Review Comment:
   We should generate our own seed and print/log the seed -- in case a test 
fails, we can use the seed to reproduce the issue.
   ```
   final long seed = System.currentTimeMillis();
   log.debug("seed: " + seed);
   final Random random = new Random(seed);
    ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to