Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1623#issuecomment-185903144 I think this is in okay shape, quite an improvement over the current version. There are some things I suggest to re-examine: Method `commitOffsets()` in "KlinkKafkaConsumer08:397" misses `@Override` annotation. The "TODO: maybe add a check again here if we are still running." seems worth doing In many places, you do `if (!closableQueue.addIfOpen(...)) { error }`. Simply use the `add()`´ method, which errors if not open. Also (LegacyFetcher:120): There can never be an IllegalStateException while initially loading in the constructor. I do not see the use of the list `deadBrokerThreads` in the legacy fetcher. Unless I am overlooking something, I would remove it (simplify the code). Since they will eventually shut down anyways, I think you need not sync on them when leaving the `run()` method of the legacy fetcher. The check whether a fetcher thread is alive should probably be `thread.getNewPartitionsQueue().isOpen()`, rather than `thread.isAlive()`. That is the flag that is atomically changed and checked with the adding of partitions. The fetcher's main thread always blocks 5 seconds before it can notice that the broker threads shut down. I am wondering if we can make that more snappy, by waking the main thread up when a fetcher thread terminates (by adding a marker element to the queue). Method `findLeaderForPartitions(...)` in the legacy fetcher also fails hard once it cannot find the leader for a partition (no retries). Is that intended? The code uses everywhere `Integer.valueOf()`, creating a boxed integer, and then unboxes it. `Integer.parseInt(...)` is the preferrable choice (almost always, I would completely drop using `Integer.valueOf()` in any place). This loop (in `findLeaderForPartitions(...)`) is either to nifty for me to comprehend, or bogus: ```java List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); List<FetchPartition> partitionsToAssignInternal = new ArrayList<>(partitionsToAssign); Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>(); for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { if (partitionsToAssignInternal.size() == 0) { // we are done: all partitions are assigned break; } Iterator<FetchPartition> fpIter = partitionsToAssignInternal.iterator(); while (fpIter.hasNext()) { FetchPartition fp = fpIter.next(); if (fp.topic.equals(partitionLeader.getTopicPartition().getTopic()) && fp.partition == partitionLeader.getTopicPartition().getPartition()) { // we found the leader for one of the fetch partitions Node leader = partitionLeader.getLeader(); List<FetchPartition> partitionsOfLeader = leaderToPartitions.get(leader); if (partitionsOfLeader == null) { partitionsOfLeader = new ArrayList<>(); leaderToPartitions.put(leader, partitionsOfLeader); } partitionsOfLeader.add(fp); fpIter.remove(); break; } } } ``` Does this do anything different than the version below? (The internal iteration always finds the exact same element at the first position and breaks). ```java List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>(); for (KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { Node leader = partitionLeader.getLeader(); List<FetchPartition> partitionsOfLeader = leaderToPartitions.get(leader); if (partitionsOfLeader == null) { partitionsOfLeader = new ArrayList<>(); leaderToPartitions.put(leader, partitionsOfLeader); } partitionsOfLeader.add(fp); } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---