Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r164971328 --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, L } } } + msgId = MessageId.makeId(anchorsToIds); + } else { + msgId = MessageId.makeUnanchored(); } - MessageId msgId = MessageId.makeId(anchorsToIds); - TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId); - executor.getExecutorTransfer().transfer(t, tupleExt); + TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); + xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits()); } if (isEventLoggers) { - executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random); + task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits()); } return outTasks; } @Override public void ack(Tuple input) { + if(!ackingEnabled) + return; long ackValue = ((TupleImpl) input).getAckVal(); Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds(); for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) { - executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID, + task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), - executor.getExecutorTransfer()); + executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } - BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); - boltAckInfo.applyOn(taskData.getUserContext()); + + if ( !task.getUserContext().getHooks().isEmpty() ) { + BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); + boltAckInfo.applyOn(task.getUserContext()); + } if (delta >= 0) { - ((BoltExecutorStats) executor.getStats()).boltAckedTuple( - input.getSourceComponent(), input.getSourceStreamId(), delta); + executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); } } @Override public void fail(Tuple input) { + if(!ackingEnabled) + return; Set<Long> roots = input.getMessageId().getAnchors(); for (Long root : roots) { - executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID, - new Values(root), executor.getExecutorTransfer()); + task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, + new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); - boltFailInfo.applyOn(taskData.getUserContext()); - if (delta >= 0) { - ((BoltExecutorStats) executor.getStats()).boltFailedTuple( - input.getSourceComponent(), input.getSourceStreamId(), delta); + boltFailInfo.applyOn(task.getUserContext()); + if (delta != 0) { --- End diff -- Looks like missed spot : this should be `delta >= 0`. https://github.com/apache/storm/pull/2241/files?diff=split#r158213916 ``` (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) ```
---