Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158942184
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
---
@@ -114,27 +136,29 @@ public void ack(Tuple input) {
LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
- boltAckInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
+ boltAckInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
((BoltExecutorStats) executor.getStats()).boltAckedTuple(
input.getSourceComponent(), input.getSourceStreamId(),
delta);
}
}
@Override
public void fail(Tuple input) {
+ if(!ackingEnabled)
+ return;
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+ task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
new Values(root), executor.getExecutorTransfer());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
- boltFailInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
+ boltFailInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
--- End diff --
The value of delta would be less than 0 when the tuple is not for sample.
So to be precise, we should include 0, hence `(delta >= 0)` instead of `(delta
!= 0)`.
---