Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2453#discussion_r156519020
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -625,4 +620,21 @@ public String toString() {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
+
+ private static class PollablePartitionInfo {
+ private final Set<TopicPartition> pollablePartitions;
+ //The subset of earliest retriable offsets that are on pollable
partitions
+ private final Map<TopicPartition, Long>
pollableEarliestRetriableOffsets;
+
+ public PollablePartitionInfo(Set<TopicPartition>
pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
+ this.pollablePartitions = pollablePartitions;
+ this.pollableEarliestRetriableOffsets =
earliestRetriableOffsets.entrySet().stream()
+ .filter(entry ->
pollablePartitions.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry
-> entry.getValue()));
+ }
+
+ public boolean isPollAllowed() {
--- End diff --
Building on the comment above, I would still call it something along the
lines `isReadyToPoll()` or `isPollableRetryReady`, because that's what it is, a
retry that is ready to poll. However, it's up to you and I am +1 after this.
---