Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r164971328
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
---
@@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId,
Collection<Tuple> anchors, L
}
}
}
+ msgId = MessageId.makeId(anchorsToIds);
+ } else {
+ msgId = MessageId.makeUnanchored();
}
- MessageId msgId = MessageId.makeId(anchorsToIds);
- TupleImpl tupleExt = new
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
- executor.getExecutorTransfer().transfer(t, tupleExt);
+ TupleImpl tupleExt = new
TupleImpl(executor.getWorkerTopologyContext(), values,
executor.getComponentId(), taskId, streamId, msgId);
+ xsfer.tryTransfer(new AddressedTuple(t, tupleExt),
executor.getPendingEmits());
}
if (isEventLoggers) {
- executor.sendToEventLogger(executor, taskData, values,
executor.getComponentId(), null, random);
+ task.sendToEventLogger(executor, values,
executor.getComponentId(), null, random, executor.getPendingEmits());
}
return outTasks;
}
@Override
public void ack(Tuple input) {
+ if(!ackingEnabled)
+ return;
long ackValue = ((TupleImpl) input).getAckVal();
Map<Long, Long> anchorsToIds =
input.getMessageId().getAnchorsToIds();
for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
- executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+ task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
new Values(entry.getKey(),
Utils.bitXor(entry.getValue(), ackValue)),
- executor.getExecutorTransfer());
+ executor.getExecutorTransfer(),
executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
- BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
- boltAckInfo.applyOn(taskData.getUserContext());
+
+ if ( !task.getUserContext().getHooks().isEmpty() ) {
+ BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId,
delta);
+ boltAckInfo.applyOn(task.getUserContext());
+ }
if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
- input.getSourceComponent(), input.getSourceStreamId(),
delta);
+ executor.getStats().boltAckedTuple(input.getSourceComponent(),
input.getSourceStreamId(), delta);
}
}
@Override
public void fail(Tuple input) {
+ if(!ackingEnabled)
+ return;
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
- new Values(root), executor.getExecutorTransfer());
+ task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer(),
executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId,
delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
- boltFailInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
- input.getSourceComponent(), input.getSourceStreamId(),
delta);
+ boltFailInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
--- End diff --
Looks like missed spot : this should be `delta >= 0`.
https://github.com/apache/storm/pull/2241/files?diff=split#r158213916
```
(when (<= 0 delta)
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
```
---