Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r164971328
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
 ---
    @@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, 
Collection<Tuple> anchors, L
                             }
                         }
                     }
    +                msgId = MessageId.makeId(anchorsToIds);
    +            } else {
    +                msgId = MessageId.makeUnanchored();
                 }
    -            MessageId msgId = MessageId.makeId(anchorsToIds);
    -            TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
    -            executor.getExecutorTransfer().transfer(t, tupleExt);
    +            TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, 
executor.getComponentId(), taskId, streamId, msgId);
    +            xsfer.tryTransfer(new AddressedTuple(t, tupleExt), 
executor.getPendingEmits());
             }
             if (isEventLoggers) {
    -            executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), null, random);
    +            task.sendToEventLogger(executor, values, 
executor.getComponentId(), null, random, executor.getPendingEmits());
             }
             return outTasks;
         }
     
         @Override
         public void ack(Tuple input) {
    +        if(!ackingEnabled)
    +            return;
             long ackValue = ((TupleImpl) input).getAckVal();
             Map<Long, Long> anchorsToIds = 
input.getMessageId().getAnchorsToIds();
             for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
    -            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
    +            task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
                         new Values(entry.getKey(), 
Utils.bitXor(entry.getValue(), ackValue)),
    -                    executor.getExecutorTransfer());
    +                    executor.getExecutorTransfer(), 
executor.getPendingEmits());
             }
             long delta = tupleTimeDelta((TupleImpl) input);
             if (isDebug) {
                 LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
             }
    -        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
    -        boltAckInfo.applyOn(taskData.getUserContext());
    +
    +        if ( !task.getUserContext().getHooks().isEmpty() ) {
    +            BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, 
delta);
    +            boltAckInfo.applyOn(task.getUserContext());
    +        }
             if (delta >= 0) {
    -            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
    -                    input.getSourceComponent(), input.getSourceStreamId(), 
delta);
    +            executor.getStats().boltAckedTuple(input.getSourceComponent(), 
input.getSourceStreamId(), delta);
             }
         }
     
         @Override
         public void fail(Tuple input) {
    +        if(!ackingEnabled)
    +            return;
             Set<Long> roots = input.getMessageId().getAnchors();
             for (Long root : roots) {
    -            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
    -                    new Values(root), executor.getExecutorTransfer());
    +            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
    +                    new Values(root), executor.getExecutorTransfer(), 
executor.getPendingEmits());
             }
             long delta = tupleTimeDelta((TupleImpl) input);
             if (isDebug) {
                 LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, 
delta, input);
             }
             BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
    -        boltFailInfo.applyOn(taskData.getUserContext());
    -        if (delta >= 0) {
    -            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
    -                    input.getSourceComponent(), input.getSourceStreamId(), 
delta);
    +        boltFailInfo.applyOn(task.getUserContext());
    +        if (delta != 0) {
    --- End diff --
    
    Looks like missed spot : this should be `delta >= 0`.
    
    https://github.com/apache/storm/pull/2241/files?diff=split#r158213916
    
    ```
     (when (<= 0 delta) 
       (stats/bolt-failed-tuple! executor-stats 
                                 (.getSourceComponent tuple) 
                                 (.getSourceStreamId tuple) 
                                 delta)))) 
    ```


---

Reply via email to