Github user choojoyq commented on a diff in the pull request:
https://github.com/apache/storm/pull/2726#discussion_r200655178
--- Diff:
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -140,6 +140,42 @@ public void testPartitionManagerRecreate() throws
Exception {
}
}
+ @Test
+ public void testPartitionManagerRecreateMultipleTopics() throws
Exception {
+ List<ZkCoordinator> coordinatorList = buildCoordinators(1);
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9092),
+ TestUtils.buildPartitionInfo("topic2", 1, 9092)));
+
+ List<PartitionManager> partitionManagersBeforeRefresh =
coordinatorList.get(0).getMyManagedPartitions();
+ assertEquals(2, partitionManagersBeforeRefresh.size());
+ for (PartitionManager partitionManager :
partitionManagersBeforeRefresh) {
+ partitionManager._emittedToOffset = 100L;
+ partitionManager._committedTo = 100L;
+ }
+
+ waitForRefresh();
+
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(
+ TestUtils.buildPartitionInfo("topic1", 1, 9093),
+ TestUtils.buildPartitionInfo("topic3", 1, 9093)));
--- End diff --
The idea of the test that before rebalancing some ``Task`` was responsible
for 2 partitions from topics 1 and 2 from broker 9092, but after rebalancing of
Kafka brokers it became responsible for the same partition for topic 1 and
another partition from topic 3 which are hosted now on broker 9093. So ``Task``
should preserve partition information about topic1 as it was managed by this
task before and it knows how many messages was already emitted, committed, etc.
While partition from topic3 is a brand new and so it should be created from
scratch without reusing of any information. Before the fix information for
topic3 would be reused from topic1 as lookup of previously managed partitions
was performed only on partition number ignoring topic.
---