Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r129501931
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -366,31 +330,42 @@ protected void setupTicks(boolean isSpout) {
@Override
public void run() {
TupleImpl tuple = new
TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
- (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
- List<AddressedTuple> tickTuple =
- Lists.newArrayList(new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
- receiveQueue.publish(tickTuple);
+ Constants.SYSTEM_COMPONENT_ID, (int)
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ AddressedTuple tickTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(tickTuple);
+ receiveQueue.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when emitting
tick tuple. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
+ }
}
});
}
}
}
-
- private DisruptorQueue mkExecutorBatchQueue(Map<String, Object>
topoConf, List<Long> executorId) {
- int sendSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
- int waitTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
- int batchSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
- int batchTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
- return new DisruptorQueue("executor" + executorId + "-send-queue",
ProducerType.SINGLE,
- sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+ // Called by flush-tuple-timer thread
+ public boolean publishFlushTuple() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new
Values(), Constants.SYSTEM_COMPONENT_ID,
+ (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_FLUSH_STREAM_ID);
+ AddressedTuple flushTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ if( receiveQueue.tryPublish(flushTuple) ) {
+ LOG.debug("Published Flush tuple to: {} ", getComponentId());
--- End diff --
thats fine... its not doing any lookups. nor is it in critical path.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---