Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2726#discussion_r200456703 --- Diff: external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java --- @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws Exception { } } + @Test + public void testPartitionManagerRecreateMultipleTopics() throws Exception { + List<ZkCoordinator> coordinatorList = buildCoordinators(1); + + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList( + TestUtils.buildPartitionInfo("topic1", 1, 9092), + TestUtils.buildPartitionInfo("topic2", 1, 9092))); + + List<PartitionManager> partitionManagersBeforeRefresh = coordinatorList.get(0).getMyManagedPartitions(); + assertEquals(2, partitionManagersBeforeRefresh.size()); + for (PartitionManager partitionManager : partitionManagersBeforeRefresh) { + partitionManager._emittedToOffset = 100L; + partitionManager._committedTo = 100L; + } + + waitForRefresh(); + + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList( + TestUtils.buildPartitionInfo("topic1", 1, 9093), + TestUtils.buildPartitionInfo("topic3", 1, 9093))); --- End diff -- Nit: Seems weird to exclude topic 2, that can't happen in a real setup. Shouldn't you be checking that topic1 and topic2 can't overwrite each others' offsets instead (e.g by making them different a few lines up)?
---