Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159960326
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String
getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in
executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" +
executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer,
executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie,
Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" +
executorId);
- return new ExecutorShutdown(this,
Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler),
idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws
Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws
Exception {
- ArrayList<AddressedTuple> addressedTuples =
(ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}",
taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ
processing interrupted") );
--- End diff --
Could you explain a bit more when JCQueue.INTERRUPT happens and where this
exception is caught/handled? Almost every other place that we use an
InterruptedException it is treated as the worker is going down in an orderly
manor so end the thread without complaining. I just want to be sure that we
don't accidentally shut down part of the worker in some odd cases.
---