Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158928999
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
---
@@ -114,27 +136,29 @@ public void ack(Tuple input) {
LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
- boltAckInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
+ boltAckInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
((BoltExecutorStats) executor.getStats()).boltAckedTuple(
input.getSourceComponent(), input.getSourceStreamId(),
delta);
}
}
@Override
public void fail(Tuple input) {
+ if(!ackingEnabled)
+ return;
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+ task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
new Values(root), executor.getExecutorTransfer());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
- boltFailInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
+ boltFailInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
--- End diff --
Sorry I dont understand. can you please clarify ?
---