Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2596#discussion_r176908038
--- Diff:
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
---
@@ -92,20 +173,37 @@ public void nextTuple() {
}
count++;
total++;
- if (count > 1000) {
+ if (count > PENDING_COUNT_MAX) {
count = 0;
- System.out.println("Pending count: " + this.pending.size()
+ ", total: " + this.total);
+ System.out.println("Pending count: " + this.pending.size()
+ + ", total: " + this.total);
}
Thread.yield();
}
- public void ack(Object msgId) {
+ /**
+ * Acknoledges the message with id {@code msgId}.
+ * @param msgId the message id
+ */
+ @Override
+ public void ack(final Object msgId) {
this.pending.remove(msgId);
}
- public void fail(Object msgId) {
+ /**
+ * Markes the message with id {@code msgId} as failed.
+ * @param msgId the message id
+ */
+ @Override
+ public void fail(final Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
}
}
+
+ /**
+ * Utility constructor.
--- End diff --
What does this mean/what is this for?
---