Repository: samza Updated Branches: refs/heads/master f4ba9cdc1 -> 8a22e05c9
http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java index 2bf6cee..385083b 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java @@ -21,88 +21,265 @@ package org.apache.samza.container.grouper.stream; import static org.junit.Assert.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskName; import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; import org.junit.Test; public class TestGroupBySystemStreamPartition { - SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)); - SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); - SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); - SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); - GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory(); + private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)); + private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); + private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); + private SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); + private GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory(); @Test public void testLocalStreamGroupedCorrectly() { - HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); - HashMap<String, String> configMap = new HashMap<String, String>(); - Config config = new MapConfig(configMap); - - SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); - Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs); + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(new MapConfig()); + Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(new HashSet<>()); assertTrue(emptyResult.isEmpty()); - Collections.addAll(allSSPs, aa0, aa1, aa2, ac0); - Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); - Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0)); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName(aa0.toString()), ImmutableSet.of(aa0)) + .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1)) + .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2)) + .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0)) + .build(); - HashSet<SystemStreamPartition> partitionaa0 = new HashSet<SystemStreamPartition>(); - partitionaa0.add(aa0); - expectedResult.put(new TaskName(aa0.toString()), partitionaa0); + assertEquals(expectedResult, result); + } - HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>(); - partitionaa1.add(aa1); - expectedResult.put(new TaskName(aa1.toString()), partitionaa1); + @Test + public void testBroadcastStreamGroupedCorrectly() { + Config config = new MapConfig(ImmutableMap.of("task.broadcast.inputs", "SystemA.StreamA#0")); - HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>(); - partitionaa2.add(aa2); - expectedResult.put(new TaskName(aa2.toString()), partitionaa2); + SystemStreamPartitionGrouper grouper = new GroupBySystemStreamPartition(config); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0)); - HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>(); - partitionac0.add(ac0); - expectedResult.put(new TaskName(ac0.toString()), partitionac0); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1, aa0)) + .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2, aa0)) + .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0, aa0)) + .build(); assertEquals(expectedResult, result); } @Test - public void testBroadcastStreamGroupedCorrectly() { - HashMap<String, String> configMap = new HashMap<String, String>(); - configMap.put("task.broadcast.inputs", "SystemA.StreamA#0"); - Config config = new MapConfig(configMap); + public void testSingleStreamRepartitioning() { + Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithSingleStream = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .build(); - HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); - Collections.addAll(allSSPs, aa0, aa1, aa2, ac0); - GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory(); - SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); - Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + Set<SystemStreamPartition> currSsps = IntStream.range(0, 8) + .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))) + .collect(Collectors.toSet()); - Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), + new SystemStreamPartition("kafka", "PVE", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), + new SystemStreamPartition("kafka", "PVE", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)), + new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), + new SystemStreamPartition("kafka", "PVE", new Partition(6)))) + .build(); - HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>(); - partitionaa1.add(aa1); - partitionaa1.add(aa0); - expectedResult.put(new TaskName(aa1.toString()), partitionaa1); + SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig())); + GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithSingleStream, new HashMap<>()); + Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext); + Assert.assertEquals(expectedGrouping, finalGrouping); + } - HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>(); - partitionaa2.add(aa2); - partitionaa2.add(aa0); - expectedResult.put(new TaskName(aa2.toString()), partitionaa2); + @Test + public void testMultipleStreamsWithSingleStreamRepartitioning() { + Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)))) + .build(); - HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>(); - partitionac0.add(ac0); - partitionac0.add(aa0); - expectedResult.put(new TaskName(ac0.toString()), partitionac0); + Set<SystemStreamPartition> currSsps = IntStream.range(0, 8) + .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))) + .collect(Collectors.toSet()); + IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))); + IntStream.range(0, 4).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId)))); - assertEquals(expectedResult, result); + Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), + new SystemStreamPartition("kafka", "PVE", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), + new SystemStreamPartition("kafka", "PVE", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), + new SystemStreamPartition("kafka", "PVE", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), + new SystemStreamPartition("kafka", "PVE", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)))) + .build(); + + SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig())); + GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>()); + Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext); + Assert.assertEquals(expectedGrouping, finalGrouping); + } + + @Test + public void testOnlyNewlyAddedStreams() { + Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .build(); + + Set<SystemStreamPartition> currSsps = IntStream.range(0, 8) + .mapToObj(partitionId -> new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))) + .collect(Collectors.toSet()); + + Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6)))) + .build(); + + SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig())); + GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>()); + Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext); + Assert.assertEquals(expectedGrouping, finalGrouping); + } + + + @Test + public void testRemovalAndAdditionOfStreamsWithRepartitioning() { + Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)))) + .build(); + + Set<SystemStreamPartition> currSsps = IntStream.range(0, 8) + .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))) + .collect(Collectors.toSet()); + IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))); + + Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(5)), + new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(6)), + new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)), + new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(4)), + new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0)))) + .build(); + + SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig())); + GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>()); + Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext); + Assert.assertEquals(expectedGrouping, finalGrouping); + } + + @Test + public void testMultipleStreamRepartitioningWithNewStreams() { + Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)))) + .build(); + + Set<SystemStreamPartition> currSsps = IntStream.range(0, 8) + .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))) + .collect(Collectors.toSet()); + IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))); + IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId)))); + + Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder() + .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), + new SystemStreamPartition("kafka", "PVE", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), + new SystemStreamPartition("kafka", "PVE", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), + new SystemStreamPartition("kafka", "PVE", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), + new SystemStreamPartition("kafka", "PVE", new Partition(4)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)), + new SystemStreamPartition("kafka", "URE", new Partition(5)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)), + new SystemStreamPartition("kafka", "URE", new Partition(6)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)), + new SystemStreamPartition("kafka", "URE", new Partition(7)))) + .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)), + new SystemStreamPartition("kafka", "URE", new Partition(4)))) + .build(); + + SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig())); + GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>()); + Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext); + Assert.assertEquals(expectedGrouping, finalGrouping); } }