Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129218772
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
    @@ -17,72 +17,92 @@
      */
     package org.apache.storm.executor;
     
    -import com.google.common.annotations.VisibleForTesting;
    -import com.lmax.disruptor.EventHandler;
     import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.serialization.KryoTupleSerializer;
     import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.DisruptorQueue;
    -import org.apache.storm.utils.MutableObject;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.Map;
    -import java.util.concurrent.Callable;
     
    -public class ExecutorTransfer implements EventHandler, Callable {
    +public class ExecutorTransfer  {
         private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
     
         private final WorkerState workerData;
    -    private final DisruptorQueue batchTransferQueue;
    -    private final Map<String, Object> topoConf;
         private final KryoTupleSerializer serializer;
    -    private final MutableObject cachedEmit;
         private final boolean isDebug;
    +    private final int producerBatchSz;
    +    private int remotesBatchSz = 0;
    +    private final ArrayList<JCQueue> localReceiveQueues; // [taksid]=queue
    +    private final ArrayList<JCQueue> outboundQueues; // [taksid]=queue, 
some entries can be null
     
    -    public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map<String, Object> topoConf) {
    +
    +    public ExecutorTransfer(WorkerState workerData, Map<String, Object> 
topoConf) {
             this.workerData = workerData;
    -        this.batchTransferQueue = batchTransferQueue;
    -        this.topoConf = topoConf;
             this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
    -        this.cachedEmit = new MutableObject(new ArrayList<>());
             this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.producerBatchSz = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        this.localReceiveQueues = 
Utils.convertToArray(workerData.getShortExecutorReceiveQueueMap());
    +        this.outboundQueues = new 
ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
         }
     
    -    public void transfer(int task, Tuple tuple) {
    -        AddressedTuple val = new AddressedTuple(task, tuple);
    +
    +    public void transfer(int task, Tuple tuple) throws 
InterruptedException {
    +        AddressedTuple addressedTuple = new AddressedTuple(task, tuple);
             if (isDebug) {
    -            LOG.info("TRANSFERRING tuple {}", val);
    +            LOG.info("TRANSFERRING tuple {}", addressedTuple);
             }
    -        batchTransferQueue.publish(val);
    -    }
     
    -    @VisibleForTesting
    -    public DisruptorQueue getBatchTransferQueue() {
    -        return this.batchTransferQueue;
    +        boolean isLocal = transferLocal(addressedTuple);
    +        if (!isLocal) {
    +            transferRemote(addressedTuple);
    +            ++remotesBatchSz;
    +            if(remotesBatchSz >=producerBatchSz) {
    +                flushRemotes();
    +                remotesBatchSz =0;
    +            }
    +        }
         }
     
    -    @Override
    -    public Object call() throws Exception {
    -        batchTransferQueue.consumeBatchWhenAvailable(this);
    -        return 0L;
    +    private void transferRemote(AddressedTuple tuple) throws 
InterruptedException {
    +        workerData.transferRemote(tuple);
         }
     
    -    public String getName() {
    -        return batchTransferQueue.getName();
    +    // flushes local and remote messages
    +    public void flush() throws InterruptedException {
    +        flushLocal();
    +        flushRemotes();
         }
     
    -    @Override
    -    public void onEvent(Object event, long sequence, boolean endOfBatch) 
throws Exception {
    -        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
    -        cachedEvents.add(event);
    -        if (endOfBatch) {
    -            workerData.transfer(serializer, cachedEvents);
    -            cachedEmit.setObject(new ArrayList<>());
    +    private void flushLocal() throws InterruptedException {
    +        for (int i = 0; i < outboundQueues.size(); i++) {
    +            JCQueue q = outboundQueues.get(i);
    +            if(q!=null)
    +                q.flush();
             }
         }
    +
    +    private void flushRemotes() throws InterruptedException {
    +        workerData.flushRemotes();
    +    }
    +
    +    public boolean transferLocal(AddressedTuple tuple) throws 
InterruptedException {
    +        workerData.checkSerialize(serializer, tuple);
    --- End diff --
    
    I guess this should be done only when it is determined to be sending to 
local task. So, this should be pushed to just before `queue.publish(tuple)` 
below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to