Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159962361
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
*/
package org.apache.storm.executor;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
- private final DisruptorQueue batchTransferQueue;
- private final Map<String, Object> topoConf;
private final KryoTupleSerializer serializer;
- private final MutableObject cachedEmit;
private final boolean isDebug;
+ private final int producerBatchSz;
+ private int remotesBatchSz = 0;
+ private int indexingBase = 0;
+ private ArrayList<JCQueue> localReceiveQueues; //
[taskId-indexingBase] => queue : List of all recvQs local to this worker
+ private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] =>
queue, some entries can be null. : outbound Qs for this executor instance
- public ExecutorTransfer(WorkerState workerData, DisruptorQueue
batchTransferQueue, Map<String, Object> topoConf) {
+
+ public ExecutorTransfer(WorkerState workerData, Map<String, Object>
topoConf) {
this.workerData = workerData;
- this.batchTransferQueue = batchTransferQueue;
- this.topoConf = topoConf;
this.serializer = new KryoTupleSerializer(topoConf,
workerData.getWorkerTopologyContext());
- this.cachedEmit = new MutableObject(new ArrayList<>());
this.isDebug =
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.producerBatchSz =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ }
+
+ // to be called after all Executor objects in the worker are created
and before this object is used
+ public void initLocalRecvQueues() {
+ Integer minTaskId =
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+ this.localReceiveQueues = Utils.convertToArray(
workerData.getLocalReceiveQueues(), minTaskId);
+ this.indexingBase = minTaskId;
+ this.queuesToFlush = new
ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
}
- public void transfer(int task, Tuple tuple) {
- AddressedTuple val = new AddressedTuple(task, tuple);
+ // adds addressedTuple to destination Q if it is not full. else adds
to pendingEmits (if its not null)
+ public boolean tryTransfer(AddressedTuple addressedTuple,
Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
- LOG.info("TRANSFERRING tuple {}", val);
+ LOG.info("TRANSFERRING tuple {}", addressedTuple);
+ }
+
+ JCQueue localQueue = getLocalQueue(addressedTuple);
+ if (localQueue!=null) {
+ return tryTransferLocal(addressedTuple, localQueue,
pendingEmits);
+ } else {
+ if (remotesBatchSz >= producerBatchSz) {
+ if ( !workerData.tryFlushRemotes() ) {
+ if (pendingEmits != null) {
+ pendingEmits.add(addressedTuple);
+ }
+ return false;
+ }
+ remotesBatchSz = 0;
--- End diff --
Do we have a race condition here? I believe that this method can be called
from multiple different threads, and if so then we now have to worry about
remotesBatchSz staying consistent.
---