Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358991
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String
getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in
executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" +
executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer,
executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie,
Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" +
executorId);
- return new ExecutorShutdown(this,
Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler),
idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws
Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws
Exception {
- ArrayList<AddressedTuple> addressedTuples =
(ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}",
taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ
processing interrupted") );
+ }
+ AddressedTuple addressedTuple = (AddressedTuple)event;
+ int taskId = addressedTuple.getDest();
+
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}",
taskId, tuple);
+ }
+
+ try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
--- End diff --
[Utils.asyncLoop()](https://github.com/roshannaik/storm/blob/STORM-2306-2/storm-client/src/jvm/org/apache/storm/utils/Utils.java#L351)
of the BoltExecutor or SpoutExecutor thread
---