Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1623#issuecomment-185903144
  
    I think this is in okay shape, quite an improvement over the current 
version.
    
    There are some things I suggest to re-examine:
    
    
    Method `commitOffsets()` in "KlinkKafkaConsumer08:397" misses `@Override` 
annotation.
    
    The "TODO: maybe add a check again here if we are still running." seems 
worth doing
    
    In many places, you do `if (!closableQueue.addIfOpen(...)) { error }`. 
Simply use the `add()`´ method, which errors if not open. Also 
(LegacyFetcher:120): There can never be an IllegalStateException while 
initially loading in the constructor.
    
    I do not see the use of the list `deadBrokerThreads` in the legacy fetcher. 
Unless I am overlooking something, I would remove it (simplify the code). Since 
they will eventually shut down anyways, I think you need not sync on them when 
leaving the `run()` method of the legacy fetcher.
    
    The check whether a fetcher thread is alive should probably be 
`thread.getNewPartitionsQueue().isOpen()`, rather than `thread.isAlive()`. That 
is the flag that is atomically changed and checked with the adding of 
partitions. 
    
    The fetcher's main thread always blocks 5 seconds before it can notice that 
the broker threads shut down.
    I am wondering if we can make that more snappy, by waking the main thread 
up when a fetcher thread terminates (by adding a marker element to the queue).
    
    
    Method `findLeaderForPartitions(...)` in the legacy fetcher also fails hard 
once it cannot find the leader for a partition (no retries). Is that intended?
    
    The code uses everywhere `Integer.valueOf()`, creating a boxed integer, and 
then unboxes it. `Integer.parseInt(...)` is the preferrable choice (almost 
always, I would completely drop using `Integer.valueOf()` in any place).
    
    This loop (in `findLeaderForPartitions(...)`) is either to nifty for me to 
comprehend, or bogus:
    
    ```java
    List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
    List<FetchPartition> partitionsToAssignInternal = new 
ArrayList<>(partitionsToAssign);
    
    Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>();
    
    for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
        if (partitionsToAssignInternal.size() == 0) {
                // we are done: all partitions are assigned
                break;
        }
        Iterator<FetchPartition> fpIter = partitionsToAssignInternal.iterator();
        while (fpIter.hasNext()) {
                FetchPartition fp = fpIter.next();
                if 
(fp.topic.equals(partitionLeader.getTopicPartition().getTopic())
                                && fp.partition == 
partitionLeader.getTopicPartition().getPartition()) {
                        
                        // we found the leader for one of the fetch partitions
                        Node leader = partitionLeader.getLeader();
                        List<FetchPartition> partitionsOfLeader = 
leaderToPartitions.get(leader);
                        if (partitionsOfLeader == null) {
                                partitionsOfLeader = new ArrayList<>();
                                leaderToPartitions.put(leader, 
partitionsOfLeader);
                        }
                        partitionsOfLeader.add(fp);
                        fpIter.remove();
                        break;
                }
        }
    }
    ```
    Does this do anything different than the version below? (The internal 
iteration always finds the
    exact same element at the first position and breaks).
    ```java
    List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
    Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>();
    
    for (KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
        Node leader = partitionLeader.getLeader();
        List<FetchPartition> partitionsOfLeader = 
leaderToPartitions.get(leader);
        if (partitionsOfLeader == null) {
                partitionsOfLeader = new ArrayList<>();
                leaderToPartitions.put(leader, partitionsOfLeader);
        }
        partitionsOfLeader.add(fp);
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to