Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159960500
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
    -        return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
    -        ArrayList<AddressedTuple> addressedTuples = 
(ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
    +        }
    +        AddressedTuple addressedTuple =  (AddressedTuple)event;
    +        int taskId = addressedTuple.getDest();
    +
    +        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    +        if (isDebug) {
    +            LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
    +        }
    +
    +        try {
                 if (taskId != AddressedTuple.BROADCAST_DEST) {
                     tupleActionFn(taskId, tuple);
                 } else {
                     for (Integer t : taskIds) {
                         tupleActionFn(t, tuple);
                     }
                 }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    --- End diff --
    
    Where is this exception handled?


---

Reply via email to