Repository: samza
Updated Branches:
  refs/heads/master f4ba9cdc1 -> 8a22e05c9


http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
index 2bf6cee..385083b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -21,88 +21,265 @@ package org.apache.samza.container.grouper.stream;
 
 import static org.junit.Assert.*;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestGroupBySystemStreamPartition {
-  SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", 
new Partition(0));
-  SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", 
new Partition(1));
-  SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", 
new Partition(2));
-  SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", 
new Partition(0));
-  GroupBySystemStreamPartitionFactory grouperFactory = new 
GroupBySystemStreamPartitionFactory();
+  private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(0));
+  private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(1));
+  private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(2));
+  private SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", 
"StreamB", new Partition(0));
+  private GroupBySystemStreamPartitionFactory grouperFactory = new 
GroupBySystemStreamPartitionFactory();
 
   @Test
   public void testLocalStreamGroupedCorrectly() {
-    HashSet<SystemStreamPartition> allSSPs = new 
HashSet<SystemStreamPartition>();
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    Config config = new MapConfig(configMap);
-
-    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
-    Map<TaskName, Set<SystemStreamPartition>> emptyResult = 
grouper.group(allSSPs);
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(new MapConfig());
+    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(new 
HashSet<>());
     assertTrue(emptyResult.isEmpty());
 
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new 
HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> result = 
grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+                .put(new TaskName(aa0.toString()), ImmutableSet.of(aa0))
+                .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1))
+                .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2))
+                .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0))
+                .build();
 
-    HashSet<SystemStreamPartition> partitionaa0 = new 
HashSet<SystemStreamPartition>();
-    partitionaa0.add(aa0);
-    expectedResult.put(new TaskName(aa0.toString()), partitionaa0);
+    assertEquals(expectedResult, result);
+  }
 
-    HashSet<SystemStreamPartition> partitionaa1 = new 
HashSet<SystemStreamPartition>();
-    partitionaa1.add(aa1);
-    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+  @Test
+  public void testBroadcastStreamGroupedCorrectly() {
+    Config config = new MapConfig(ImmutableMap.of("task.broadcast.inputs", 
"SystemA.StreamA#0"));
 
-    HashSet<SystemStreamPartition> partitionaa2 = new 
HashSet<SystemStreamPartition>();
-    partitionaa2.add(aa2);
-    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+    SystemStreamPartitionGrouper grouper = new 
GroupBySystemStreamPartition(config);
+    Map<TaskName, Set<SystemStreamPartition>> result = 
grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
 
-    HashSet<SystemStreamPartition> partitionac0 = new 
HashSet<SystemStreamPartition>();
-    partitionac0.add(ac0);
-    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1, aa0))
+            .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2, aa0))
+            .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0, aa0))
+            .build();
 
     assertEquals(expectedResult, result);
   }
 
   @Test
-  public void testBroadcastStreamGroupedCorrectly() {
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0");
-    Config config = new MapConfig(configMap);
+  public void testSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithSingleStream = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> allSSPs = new 
HashSet<SystemStreamPartition>();
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
-    GroupBySystemStreamPartitionFactory grouperFactory = new 
GroupBySystemStreamPartitionFactory();
-    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", 
new Partition(partitionId)))
+            .collect(Collectors.toSet());
 
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new 
HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(6))))
+            .build();
 
-    HashSet<SystemStreamPartition> partitionaa1 = new 
HashSet<SystemStreamPartition>();
-    partitionaa1.add(aa1);
-    partitionaa1.add(aa0);
-    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new 
MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new 
HashMap<>(), prevGroupingWithSingleStream, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = 
groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
 
-    HashSet<SystemStreamPartition> partitionaa2 = new 
HashSet<SystemStreamPartition>();
-    partitionaa2.add(aa2);
-    partitionaa2.add(aa0);
-    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+  @Test
+  public void testMultipleStreamsWithSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams 
= ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> partitionac0 = new 
HashSet<SystemStreamPartition>();
-    partitionac0.add(ac0);
-    partitionac0.add(aa0);
-    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", 
new Partition(partitionId)))
+            .collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new 
SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+    IntStream.range(0, 4).forEach(partitionId -> currSsps.add(new 
SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
 
-    assertEquals(expectedResult, result);
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new 
MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new 
HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = 
groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testOnlyNewlyAddedStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams 
= ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "BOB", 
new Partition(partitionId)))
+            .collect(Collectors.toSet());
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new 
MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new 
HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = 
groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+
+  @Test
+  public void testRemovalAndAdditionOfStreamsWithRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams 
= ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", 
new Partition(partitionId)))
+            .collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new 
SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(5)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(6)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(4)),
+                    new SystemStreamPartition("kafka", "PVE", new 
Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new 
MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new 
HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = 
groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testMultipleStreamRepartitioningWithNewStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams 
= ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+                                                   .mapToObj(partitionId -> 
new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+                                                   
.collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new 
SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new 
SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = 
ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                                               
         new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)),
+                                                                               
         new SystemStreamPartition("kafka", "URE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)),
+                                                                               
         new SystemStreamPartition("kafka", "URE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)),
+                                                                               
         new SystemStreamPartition("kafka", "URE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), 
ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)),
+                                                                               
         new SystemStreamPartition("kafka", "URE", new Partition(4))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new 
MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new 
HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = 
groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
   }
 }

Reply via email to