Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129479705
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -366,31 +330,42 @@ protected void setupTicks(boolean isSpout) {
                         @Override
                         public void run() {
                             TupleImpl tuple = new 
TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
    -                                (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
    -                        List<AddressedTuple> tickTuple =
    -                                Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
    -                        receiveQueue.publish(tickTuple);
    +                                Constants.SYSTEM_COMPONENT_ID, (int) 
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
    +                        AddressedTuple tickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
    +                        try {
    +                            receiveQueue.publish(tickTuple);
    +                            receiveQueue.flush();
    +                        } catch (InterruptedException e) {
    +                            LOG.warn("Thread interrupted when emitting 
tick tuple. Setting interrupt flag.");
    +                            Thread.currentThread().interrupt();
    +                            return;
    +                        }
                         }
                     });
                 }
             }
         }
     
    -
    -    private DisruptorQueue mkExecutorBatchQueue(Map<String, Object> 
topoConf, List<Long> executorId) {
    -        int sendSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
    -        int waitTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
    -        int batchSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
    -        int batchTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
    -        return new DisruptorQueue("executor" + executorId + "-send-queue", 
ProducerType.SINGLE,
    -                sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
    +    // Called by flush-tuple-timer thread
    +    public boolean publishFlushTuple() {
    +        TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(), Constants.SYSTEM_COMPONENT_ID,
    --- End diff --
    
    Should this be constructed newly for each call? If this is a fixed value 
you can make it a static final value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to