Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159960326
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String 
getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
    -        return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
    -        ArrayList<AddressedTuple> addressedTuples = 
(ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted") );
    --- End diff --
    
    Could you explain a bit more when JCQueue.INTERRUPT happens and where this 
exception is caught/handled?  Almost every other place that we use an 
InterruptedException it is treated as the worker is going down in an orderly 
manor so end the thread without complaining.  I just want to be sure that we 
don't accidentally shut down part of the worker in some odd cases.


---

Reply via email to