[ https://issues.apache.org/jira/browse/STORM-3214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606876#comment-16606876 ]
Jing Chen commented on STORM-3214: ---------------------------------- [~sj] thanks for the catch, I believe it has been fixed at 1.1.x-branch, please have a look at: [https://github.com/apache/storm/blob/1.1.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java#L93] Also, storm-kafka has been deprecated and will be removed in a future storm release after 1.x version. Please upgrade to storm-kafka-client. FYI, [https://github.com/apache/storm/tree/master/external/storm-kafka-client] > 使用 kafka.topic.wildcard.match > =true的时候,ZkCoordinator.refresh中deletedManagers会出现逻辑错误 > ----------------------------------------------------------------------------------- > > Key: STORM-3214 > URL: https://issues.apache.org/jira/browse/STORM-3214 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 1.1.3, 1.2.2 > Reporter: shusj > Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > 使用 kafka.topic.wildcard.match > =true的时候,如果topic数目大于1,ZkCoordinator.refresh中deletedManagers会出现逻辑错误 > 只需要将ZkCoordinator@L91: Map<Integer, PartitionManager> deletedManagers = > new HashMap<>(); > 将key修改为topic+partition > > 在org.apache.storm.kafka.ZkCoordinatorTest中添加了如下测试 > {code:java} > //代码占位符 > public static GlobalPartitionInformation buildPartitionInfo(int > numPartitions, int brokerPort, String topic) { > GlobalPartitionInformation globalPartitionInformation = new > GlobalPartitionInformation(topic); > for (int i = 0; i < numPartitions; i++) { > globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + > " :" + brokerPort)); > } > return globalPartitionInformation; > } > @Test > public void testTwoTopicPartitionsChange() throws Exception { > int numPartitions = 2; > int partitionsPerTask = 1; > final Set<Partition> unregisterList = new HashSet<>(); > Mockito.doAnswer(new Answer() { > @Override > public Object answer(InvocationOnMock invocation) throws Throwable { > Object[] arguments = invocation.getArguments(); > Partition partition = new Partition((Broker) arguments[0], (String) > arguments[1], (int) arguments[2], false); > unregisterList.add(partition); > return null; > } > }).when(dynamicPartitionConnections).unregister(any(Broker.class), > any(String.class), anyInt()); > List<ZkCoordinator> coordinatorList = buildCoordinators(partitionsPerTask); > ArrayList<GlobalPartitionInformation> prePartitionInformations = > Lists.newArrayList(buildPartitionInfo(numPartitions, 9092, "TOPIC1"), > buildPartitionInfo(numPartitions, 9092, "TOPIC2")); > when(reader.getBrokerInfo()).thenReturn(prePartitionInformations); > List<List<PartitionManager>> partitionManagersBeforeRefresh = > getPartitionManagers(coordinatorList); > waitForRefresh(); > when(reader.getBrokerInfo()).thenReturn(Lists.newArrayList(buildPartitionInfo(numPartitions, > 9093, "TOPIC1"), buildPartitionInfo(numPartitions, 9093, "TOPIC2"))); > List<List<PartitionManager>> partitionManagersAfterRefresh = > getPartitionManagers(coordinatorList); > List<Partition> allPrePartition = > KafkaUtils.calculatePartitionsForTask(prePartitionInformations, 1, 0, 0); > assertEquals(unregisterList.size(), allPrePartition.size()); > for (Partition partition : allPrePartition) { > assertTrue(unregisterList.contains(partition)); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)