Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r129736631
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -366,31 +330,42 @@ protected void setupTicks(boolean isSpout) {
@Override
public void run() {
TupleImpl tuple = new
TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
- (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
- List<AddressedTuple> tickTuple =
- Lists.newArrayList(new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
- receiveQueue.publish(tickTuple);
+ Constants.SYSTEM_COMPONENT_ID, (int)
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ AddressedTuple tickTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(tickTuple);
+ receiveQueue.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when emitting
tick tuple. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
+ }
}
});
}
}
}
-
- private DisruptorQueue mkExecutorBatchQueue(Map<String, Object>
topoConf, List<Long> executorId) {
- int sendSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
- int waitTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
- int batchSize =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
- int batchTimeOutMs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
- return new DisruptorQueue("executor" + executorId + "-send-queue",
ProducerType.SINGLE,
- sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+ // Called by flush-tuple-timer thread
+ public boolean publishFlushTuple() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new
Values(), Constants.SYSTEM_COMPONENT_ID,
+ (int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_FLUSH_STREAM_ID);
+ AddressedTuple flushTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ if( receiveQueue.tryPublish(flushTuple) ) {
+ LOG.debug("Published Flush tuple to: {} ", getComponentId());
+ return true;
+ }
+ else {
+ LOG.debug("RecvQ is currently full, will retry later. Unable
to publish Flush tuple to : ", getComponentId());
+ return false;
+ }
}
/**
* Returns map of stream id to component id to grouper
*/
private Map<String, Map<String, LoadAwareCustomStreamGrouping>>
outboundComponents(
- WorkerTopologyContext workerTopologyContext, String
componentId, Map<String, Object> topoConf) {
+ WorkerTopologyContext workerTopologyContext, String
componentId, Map topoConf) {
--- End diff --
yes
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---