Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159960500 --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java --- @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex } } + public Queue<AddressedTuple> getPendingEmits() { + return pendingEmits; + } + /** * separated from mkExecutor in order to replace executor transfer in executor data for testing */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); - registerBackpressure(); - Utils.SmartThread systemThreads = - Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); - String handlerName = componentId + "-executor" + executorId; - Utils.SmartThread handlers = + Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); setupTicks(StatsUtil.SPOUT.equals(type)); LOG.info("Finished loading executor " + componentId + ":" + executorId); - return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); + return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask); } public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; - @SuppressWarnings("unchecked") @Override - public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { - ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event; - for (AddressedTuple addressedTuple : addressedTuples) { - TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); - int taskId = addressedTuple.getDest(); - if (isDebug) { - LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); - } + public void accept(Object event) { + if (event == JCQueue.INTERRUPT) { + throw new RuntimeException(new InterruptedException("JCQ processing interrupted") ); + } + AddressedTuple addressedTuple = (AddressedTuple)event; + int taskId = addressedTuple.getDest(); + + TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); + if (isDebug) { + LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); + } + + try { if (taskId != AddressedTuple.BROADCAST_DEST) { tupleActionFn(taskId, tuple); } else { for (Integer t : taskIds) { tupleActionFn(t, tuple); } } + } catch (Exception e) { + throw new RuntimeException(e); --- End diff -- Where is this exception handled?
---