Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129736631
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -366,31 +330,42 @@ protected void setupTicks(boolean isSpout) {
                         @Override
                         public void run() {
                             TupleImpl tuple = new 
TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
    -                                (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
    -                        List<AddressedTuple> tickTuple =
    -                                Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
    -                        receiveQueue.publish(tickTuple);
    +                                Constants.SYSTEM_COMPONENT_ID, (int) 
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
    +                        AddressedTuple tickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
    +                        try {
    +                            receiveQueue.publish(tickTuple);
    +                            receiveQueue.flush();
    +                        } catch (InterruptedException e) {
    +                            LOG.warn("Thread interrupted when emitting 
tick tuple. Setting interrupt flag.");
    +                            Thread.currentThread().interrupt();
    +                            return;
    +                        }
                         }
                     });
                 }
             }
         }
     
    -
    -    private DisruptorQueue mkExecutorBatchQueue(Map<String, Object> 
topoConf, List<Long> executorId) {
    -        int sendSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
    -        int waitTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
    -        int batchSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
    -        int batchTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
    -        return new DisruptorQueue("executor" + executorId + "-send-queue", 
ProducerType.SINGLE,
    -                sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
    +    // Called by flush-tuple-timer thread
    +    public boolean publishFlushTuple() {
    +        TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(), Constants.SYSTEM_COMPONENT_ID,
    +                (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_FLUSH_STREAM_ID);
    +        AddressedTuple flushTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
    +        if( receiveQueue.tryPublish(flushTuple) ) {
    +            LOG.debug("Published Flush tuple to: {} ", getComponentId());
    +            return true;
    +        }
    +        else {
    +            LOG.debug("RecvQ is currently full, will retry later. Unable 
to publish Flush tuple to : ", getComponentId());
    +            return false;
    +        }
         }
     
         /**
          * Returns map of stream id to component id to grouper
          */
         private Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
outboundComponents(
    -            WorkerTopologyContext workerTopologyContext, String 
componentId, Map<String, Object> topoConf) {
    +            WorkerTopologyContext workerTopologyContext, String 
componentId, Map topoConf) {
    --- End diff --
    
    yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to