Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2453#discussion_r156508850
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -625,4 +620,21 @@ public String toString() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
+
+ private static class PollablePartitionInfo {
+ private final Set<TopicPartition> pollablePartitions;
+ //The subset of earliest retriable offsets that are on pollable
partitions
+ private final Map<TopicPartition, Long>
pollableEarliestRetriableOffsets;
+
+ public PollablePartitionInfo(Set<TopicPartition>
pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
+ this.pollablePartitions = pollablePartitions;
+ this.pollableEarliestRetriableOffsets =
earliestRetriableOffsets.entrySet().stream()
+ .filter(entry ->
pollablePartitions.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry
-> entry.getValue()));
+ }
+
+ public boolean isPollAllowed() {
--- End diff --
I agree that allowed can be misunderstood. I'd prefer not calling it
isEmpty, since this isn't a collection and it's not obvious which internal
collection it would be referring to. How about shouldPoll?
---