Github user choojoyq commented on a diff in the pull request:
https://github.com/apache/storm/pull/2756#discussion_r200963984
--- Diff:
external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---
@@ -80,22 +81,24 @@ public void refresh() {
List<Partition> mine =
KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
Set<Partition> curr = _managers.keySet();
- Set<Partition> newPartitions = new HashSet<Partition>(mine);
+ Set<Partition> newPartitions = new HashSet<>(mine);
newPartitions.removeAll(curr);
- Set<Partition> deletedPartitions = new
HashSet<Partition>(curr);
+ Set<Partition> deletedPartitions = new HashSet<>(curr);
deletedPartitions.removeAll(mine);
- LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition
managers: " + deletedPartitions.toString());
+ LOG.info("{}Deleted partition managers: {}",
+ taskId(_taskIndex, _totalTasks), deletedPartitions);
- Map<Integer, PartitionManager> deletedManagers = new
HashMap<>();
+ Map<TopicAndPartition, PartitionManager> deletedManagers = new
HashMap<>();
for (Partition id : deletedPartitions) {
- deletedManagers.put(id.partition, _managers.remove(id));
+ PartitionManager manager = _managers.remove(id);
+ manager.close();
+ deletedManagers.put(new TopicAndPartition(id.topic,
id.partition), manager);
}
- for (PartitionManager manager : deletedManagers.values()) {
- if (manager != null) manager.close();
- }
- LOG.info(taskId(_taskIndex, _totalTasks) + "New partition
managers: " + newPartitions.toString());
+
+ LOG.info("{}New partition managers: {}",
--- End diff --
This space is added in ``taskId`` method.
``return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";``
---