Github user askprasanna commented on a diff in the pull request:
https://github.com/apache/storm/pull/2156#discussion_r121340307
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -64,6 +65,27 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) {
// O(Log N)
public void addToEmitMsgs(long offset) {
this.emittedOffsets.add(offset); // O(Log N)
}
+
+ public int getNumUncommittedOffsets() {
+ return this.emittedOffsets.size();
+ }
+
+ /**
+ * Gets the offset of the nth emitted message after the committed
offset.
+ * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have
been emitted,
+ * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8.
+ *
+ * @param index The index of the message to get the offset for
+ * @return The offset
+ * @throws NoSuchElementException if the index is out of range
+ */
+ public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
+ Iterator<Long> offsetIter = emittedOffsets.iterator();
+ for (int i = 0; i < index - 1; i++) {
+ offsetIter.next();
+ }
--- End diff --
how about calling toArray() on the set and then fetching the Nth offset
directly using the array index? Trade-off speed for some memory/garbage. We can
possibly even pre-allocate an array of size maxUncommitted + batch size for
this purpose.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---