Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2726#discussion_r200657323
--- Diff:
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws
Exception {
}
}
+ @Test
+ public void testPartitionManagerRecreateMultipleTopics() throws
Exception {
+ List<ZkCoordinator> coordinatorList = buildCoordinators(1);
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9092),
+ TestUtils.buildPartitionInfo("topic2", 1, 9092)));
+
+ List<PartitionManager> partitionManagersBeforeRefresh =
coordinatorList.get(0).getMyManagedPartitions();
+ assertEquals(2, partitionManagersBeforeRefresh.size());
+ for (PartitionManager partitionManager :
partitionManagersBeforeRefresh) {
+ partitionManager._emittedToOffset = 100L;
+ partitionManager._committedTo = 100L;
+ }
+
+ waitForRefresh();
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9093),
+ TestUtils.buildPartitionInfo("topic3", 1, 9093)));
--- End diff --
Thanks, it makes sense. Since there's only one task, topic 2 can't be
removed though. It's not really important, the test makes sense regardless.
---