Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2453#discussion_r156499862
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -625,4 +620,21 @@ public String toString() {
         private String getTopicsString() {
             return kafkaSpoutConfig.getSubscription().getTopicsString();
         }
    +    
    +    private static class PollablePartitionInfo {
    +        private final Set<TopicPartition> pollablePartitions;
    +        //The subset of earliest retriable offsets that are on pollable 
partitions
    +        private final Map<TopicPartition, Long> 
pollableEarliestRetriableOffsets;
    +        
    +        public PollablePartitionInfo(Set<TopicPartition> 
pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
    +            this.pollablePartitions = pollablePartitions;
    +            this.pollableEarliestRetriableOffsets = 
earliestRetriableOffsets.entrySet().stream()
    +                .filter(entry -> 
pollablePartitions.contains(entry.getKey()))
    +                .collect(Collectors.toMap(entry -> entry.getKey(), entry 
-> entry.getValue()));
    +        }
    +        
    +        public boolean isPollAllowed() {
    --- End diff --
    
    why not simply `isEmpty()` ? 
    
    I really don't thing that _allowed_ is a very suitable name here because we 
are not checking for permissions or anything like that. If it is in the 
pollable set it is always allowed.


---

Reply via email to