Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r129479713
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -366,31 +330,42 @@ protected void setupTicks(boolean isSpout) {
@Override
public void run() {
TupleImpl tuple = new
TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
- (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
- List<AddressedTuple> tickTuple =
- Lists.newArrayList(new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
- receiveQueue.publish(tickTuple);
+ Constants.SYSTEM_COMPONENT_ID, (int)
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ AddressedTuple tickTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(tickTuple);
+ receiveQueue.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when emitting
tick tuple. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
+ }
}
});
}
}
}
-
- private DisruptorQueue mkExecutorBatchQueue(Map<String, Object>
topoConf, List<Long> executorId) {
- int sendSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
- int waitTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
- int batchSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
- int batchTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
- return new DisruptorQueue("executor" + executorId + "-send-queue",
ProducerType.SINGLE,
- sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+ // Called by flush-tuple-timer thread
+ public boolean publishFlushTuple() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new
Values(), Constants.SYSTEM_COMPONENT_ID,
+ (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_FLUSH_STREAM_ID);
+ AddressedTuple flushTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
--- End diff --
Should this be constructed newly for each call? If this is a fixed value
you can make it a static final value.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---