Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2726#discussion_r200456703
  
    --- Diff: 
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
    @@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws 
Exception {
             }
         }
     
    +    @Test
    +    public void testPartitionManagerRecreateMultipleTopics() throws 
Exception {
    +        List<ZkCoordinator> coordinatorList = buildCoordinators(1);
    +
    +        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9092),
    +            TestUtils.buildPartitionInfo("topic2", 1, 9092)));
    +
    +        List<PartitionManager> partitionManagersBeforeRefresh = 
coordinatorList.get(0).getMyManagedPartitions();
    +        assertEquals(2, partitionManagersBeforeRefresh.size());
    +        for (PartitionManager partitionManager : 
partitionManagersBeforeRefresh) {
    +            partitionManager._emittedToOffset = 100L;
    +            partitionManager._committedTo = 100L;
    +        }
    +
    +        waitForRefresh();
    +
    +        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
    +            TestUtils.buildPartitionInfo("topic1", 1, 9093),
    +            TestUtils.buildPartitionInfo("topic3", 1, 9093)));
    --- End diff --
    
    Nit: Seems weird to exclude topic 2, that can't happen in a real setup. 
Shouldn't you be checking that topic1 and topic2 can't overwrite each others' 
offsets instead (e.g by making them different a few lines up)?


---

Reply via email to