sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236770022
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##########
 @@ -17,206 +17,250 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
 
-  private int staleBookId = 0;
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+  private Map<Integer, Integer> joinMjId2rfNumber;
 
-  private BlockingQueue<RuntimeFilterWritable> rfQueue = new 
LinkedBlockingQueue<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> 
joinMjId2probeScanEps = new HashMap<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
 
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
-  private ReentrantLock queueLock = new ReentrantLock();
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
 
-  private Condition notEmpty = queueLock.newCondition();
+  private DrillbitContext drillbitContext;
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private SendingAccountor sendingAccountor;
 
-  private BufferAllocator bufferAllocator;
+  private  AsyncAggregateWorker asyncAggregateWorker;
 
-  private Future future;
+  private AtomicBoolean running = new AtomicBoolean(true);
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-    this.bufferAllocator = bufferAllocator;
-    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    future = executorService.submit(asyncAggregateWorker);
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+    this.drillbitContext = drillbitContext;
+    this.sendingAccountor = sendingAccountor;
+    asyncAggregateWorker = new AsyncAggregateWorker();
+    drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    if (running.get()) {
-      try {
-        aggregatedRFLock.lock();
-        if (containOne()) {
-          boolean same = aggregated.equals(runtimeFilterWritable);
-          if (!same) {
-            // This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-            // share the same FragmentContext.
-            aggregated.close();
-            currentBookId.set(0);
-            staleBookId = 0;
-            clearQueued(false);
-          }
-        }
-      } finally {
-        aggregatedRFLock.unlock();
-      }
-
-      try {
-        queueLock.lock();
-        if (rfQueue != null) {
-          rfQueue.add(runtimeFilterWritable);
-          notEmpty.signal();
-        } else {
-          runtimeFilterWritable.close();
-        }
-      } finally {
-        queueLock.unlock();
-      }
-    } else {
+  public void add(RuntimeFilterWritable runtimeFilterWritable)
+  {
+    if (!running.get()) {
       runtimeFilterWritable.close();
+      return;
+    }
+    runtimeFilterWritable.retainBuffers(1);
+    int joinMjId = 
runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+    if (joinMjId2Stopwatch.get(joinMjId) == null) {
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      joinMjId2Stopwatch.put(joinMjId, stopwatch);
+    }
+    synchronized (rfQueue) {
+      rfQueue.add(runtimeFilterWritable);
+      rfQueue.notify();
     }
   }
 
-  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
-    try {
-      aggregatedRFLock.lock();
-      return aggregated.duplicate(bufferAllocator);
-    } finally {
-      aggregatedRFLock.unlock();
+  public void close() {
+    running.set(false);
+    if (asyncAggregateWorker != null) {
+      synchronized (rfQueue) {
+        rfQueue.notify();
+      }
     }
-  }
+    while (!asyncAggregateWorker.over.get()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
 
 Review comment:
   Please log the caught interrupted exception.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to