[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-27 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236770022
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,250 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
 
-  private int staleBookId = 0;
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+  private Map joinMjId2rfNumber;
 
-  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
-  private ReentrantLock queueLock = new ReentrantLock();
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
 
-  private Condition notEmpty = queueLock.newCondition();
+  private DrillbitContext drillbitContext;
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private SendingAccountor sendingAccountor;
 
-  private BufferAllocator bufferAllocator;
+  private  AsyncAggregateWorker asyncAggregateWorker;
 
-  private Future future;
+  private AtomicBoolean running = new AtomicBoolean(true);
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
-AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
+asyncAggregateWorker = new AsyncAggregateWorker();
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if 

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236487447
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) {
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map> 
constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map rfIdentifier2RFW = new 
ConcurrentHashMap<>();
+  private Map rfIdentifier2fetched = new ConcurrentHashMap<>();
 
 Review comment:
   RuntimeFilterRecordBatch should not call get multiple times once it has 
received a valid RTF. Anyways I guess that's not a big deal and can be 
refactored in future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236397662
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -40,23 +41,18 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
  * aggregates them in an async thread, broadcast the final aggregated
  * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink
+public class RuntimeFilterSink implements Cloneable
 
 Review comment:
   implements `Closeable`. Also there are race condition similar to ones 
pointed out in original implementation of this class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236406316
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -74,32 +70,43 @@
 
   private SendingAccountor sendingAccountor;
 
+  private  AsyncAggregateWorker asyncAggregateWorker;
+
+  private AtomicBoolean running = new AtomicBoolean(true);
+
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
   public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
   {
 this.drillbitContext = drillbitContext;
 this.sendingAccountor = sendingAccountor;
-AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
   public void add(RuntimeFilterWritable runtimeFilterWritable)
   {
+if (asyncAggregateWorker == null) {
+  asyncAggregateWorker = new AsyncAggregateWorker();
+  drillbitContext.getExecutor().submit(asyncAggregateWorker);
+}
 runtimeFilterWritable.retainBuffers(1);
 int joinMjId = 
runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
 if (joinMjId2Stopwatch.get(joinMjId) == null) {
   Stopwatch stopwatch = Stopwatch.createStarted();
   joinMjId2Stopwatch.put(joinMjId, stopwatch);
 }
-queueLock.lock();
-try {
+synchronized (rfQueue) {
   rfQueue.add(runtimeFilterWritable);
 
 Review comment:
   should check `running` status before adding in queue. In cases when running 
is false the thread should `close` the received filter


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236396758
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -1013,9 +1041,9 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
 }
 
 if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-  if (bloomFilters.size() > 0) {
+  if (bloomFilter2buildId.size() > 0) {
 int hashJoinOpId = this.popConfig.getOperatorId();
-runtimeFilterReporter.sendOut(bloomFilters, probeFields, 
this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
+runtimeFilterReporter.sendOut(new 
ArrayList<>(bloomFilter2buildId.keySet()), probeFields, 
this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
 
 Review comment:
   We create `BloomFilter` for each build side field. But here when sending out 
the filters for all fields the order of `bloomFilter` and `probeFields` in 
their respective list can be different. On the receiving end first message can 
have bloomFilter  with probe fields  whereas second message 
can have  and probe fields still . Hence `aggregator` will 
aggregate  and  which can result in wrong 
results. Same is happening in `applyRuntimeFilter` in RTF operator where I 
guess it's assumed order of probe fields and bloomfilter list is consistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236405542
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -74,32 +70,43 @@
 
   private SendingAccountor sendingAccountor;
 
+  private  AsyncAggregateWorker asyncAggregateWorker;
+
+  private AtomicBoolean running = new AtomicBoolean(true);
+
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
   public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
   {
 this.drillbitContext = drillbitContext;
 this.sendingAccountor = sendingAccountor;
-AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
   public void add(RuntimeFilterWritable runtimeFilterWritable)
   {
+if (asyncAggregateWorker == null) {
+  asyncAggregateWorker = new AsyncAggregateWorker();
+  drillbitContext.getExecutor().submit(asyncAggregateWorker);
+}
 
 Review comment:
   Should be done once in constructor. Putting here has issues such that 
multiple netty threads can create multiple `asyncAggregateWorker` thread and 
leak all except the last set one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236378230
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -76,6 +77,16 @@ public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, 
RecordBatch incoming, Frag
 enableRFWaiting = 
context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
 maxWaitingTime = 
context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
 this.rfIdentifier = pop.getIdentifier();
+/**
+int majId = context.getHandle().getMajorFragmentId();
+int operatorId = pop.getOperatorId();
+if (majId == 5 && operatorId == 4) {
+  passed = true;
+}
+if (majId == 3 && operatorId == 13) {
+  passed = true;
+}
+ */
 
 Review comment:
   please remove the commented block and `passed` variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236396536
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) 
throws SchemaChangeException
   enableRuntimeFilter = false;
   return;
 }
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+List bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
+for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+  String buildField = bloomFilterDef.getBuildField();
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+  if (typedFieldId == null) {
+missingField = true;
+break;
+  }
+  int fieldId = typedFieldId.getFieldIds()[0];
+  String probeField =  bloomFilterDef.getProbeField();
+  probeFields.add(probeField);
+  bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+}
+if (missingField) {
+  logger.info("As some build side joint key fields not found, runtime 
filter was disabled");
+  enableRuntimeFilter = false;
+  return;
+}
 ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, 
context);
 try {
   hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
 } catch (Exception e) {
   throw new SchemaChangeException("Failed to construct a field's hash64 
dynamic codes", e);
 }
+if (runtimeFilterReporter == null) {
+  runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
 
 Review comment:
   With this the `initializeRuntimeFilter` will always return without actually 
creating BloomFilters. Is this initialization required here ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236407395
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -202,32 +209,36 @@ public void setJoinMjId2ScanMjId(Map 
joinMjId2ScanMjId)
 @Override
 public void run()
 {
-  while (joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty()) {
-queueLock.lock();
-RuntimeFilterWritable toAggregate = rfQueue.poll();
-if (toAggregate == null) {
-  try {
-notEmpty.await();
-  }
-  catch (InterruptedException e) {
-logger.error("RFW_Aggregator thread being interrupted", e);
-  }
-  finally {
-queueLock.unlock();
-  }
-} else {
-  queueLock.unlock();
+  while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && 
running.get()) {
+RuntimeFilterWritable toAggregate = null;
+synchronized (rfQueue) {
   try {
-aggregate(toAggregate);
-  }
-  catch (Exception e) {
-logger.error("fail to aggregate or route the RFW", e);
-throw new RuntimeException(e);
-  }
-  finally {
-toAggregate.close();
+toAggregate = rfQueue.poll();
+while (toAggregate == null && running.get()) {
+  rfQueue.wait();
+  toAggregate = rfQueue.poll();
+}
+  } catch (InterruptedException ex) {
+logger.error("RFW_Aggregator thread being interrupted", ex);
+continue;
   }
 }
+// perform aggregate outside the sync block.
+try {
+  aggregate(toAggregate);
+} catch (Exception ex) {
+  logger.error("Failed to aggregate or route the RFW", ex);
+  throw new DrillRuntimeException(ex);
+} finally {
+  toAggregate.close();
 
 Review comment:
   check if `toAggregate != null` before close.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236387141
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) 
throws SchemaChangeException
   enableRuntimeFilter = false;
   return;
 }
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+List bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
+for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+  String buildField = bloomFilterDef.getBuildField();
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+  if (typedFieldId == null) {
+missingField = true;
+break;
+  }
+  int fieldId = typedFieldId.getFieldIds()[0];
+  String probeField =  bloomFilterDef.getProbeField();
+  probeFields.add(probeField);
+  bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+}
+if (missingField) {
+  logger.info("As some build side joint key fields not found, runtime 
filter was disabled");
 
 Review comment:
   build side **joint** key ---> build side **join** key


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236407149
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -202,32 +209,36 @@ public void setJoinMjId2ScanMjId(Map 
joinMjId2ScanMjId)
 @Override
 public void run()
 {
-  while (joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty()) {
-queueLock.lock();
-RuntimeFilterWritable toAggregate = rfQueue.poll();
-if (toAggregate == null) {
-  try {
-notEmpty.await();
-  }
-  catch (InterruptedException e) {
-logger.error("RFW_Aggregator thread being interrupted", e);
-  }
-  finally {
-queueLock.unlock();
-  }
-} else {
-  queueLock.unlock();
+  while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && 
running.get()) {
+RuntimeFilterWritable toAggregate = null;
+synchronized (rfQueue) {
   try {
-aggregate(toAggregate);
-  }
-  catch (Exception e) {
-logger.error("fail to aggregate or route the RFW", e);
-throw new RuntimeException(e);
-  }
-  finally {
-toAggregate.close();
+toAggregate = rfQueue.poll();
+while (toAggregate == null && running.get()) {
+  rfQueue.wait();
+  toAggregate = rfQueue.poll();
+}
+  } catch (InterruptedException ex) {
+logger.error("RFW_Aggregator thread being interrupted", ex);
+continue;
   }
 }
+// perform aggregate outside the sync block.
+try {
+  aggregate(toAggregate);
 
 Review comment:
   check if `toAggregate != null` and then only call `aggregate(toAggregate)`. 
This can happen for case when `running` is false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-26 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r236396402
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) 
throws SchemaChangeException
   enableRuntimeFilter = false;
   return;
 }
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
 
 Review comment:
   `runtimeFilterDef` can be null here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-13 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233325858
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,218 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
+
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
 
-  private Future future;
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
 AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if (running.get()) {
-  try {
-aggregatedRFLock.lock();
-if (containOne()) {
-  boolean same = aggregated.equals(runtimeFilterWritable);
-  if (!same) {
-// This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-// share the same FragmentContext.
-

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-13 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233261620
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,217 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  private Future future;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
 AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if (running.get()) {
-  try {
-aggregatedRFLock.lock();
-if (containOne()) {
-  boolean same = aggregated.equals(runtimeFilterWritable);
-  if (!same) {
-// This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-// share the same FragmentContext.
-

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-13 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233262063
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,218 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
+
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
 
-  private Future future;
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
 AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if (running.get()) {
-  try {
-aggregatedRFLock.lock();
-if (containOne()) {
-  boolean same = aggregated.equals(runtimeFilterWritable);
-  if (!same) {
-// This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-// share the same FragmentContext.
-

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-13 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233207751
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) {
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map> 
constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map rfIdentifier2RFW = new 
ConcurrentHashMap<>();
+  private Map rfIdentifier2fetched = new ConcurrentHashMap<>();
 
 Review comment:
   The extra decrease is because of this extra 
[retainBuffers](https://github.com/apache/drill/pull/1504/files#diff-11d71582cb7541a6ace7d9a1d7072c40R374)
 call. I think we can get rid of that `retainBuffer` call and then we don't 
have to call `release` twice for unconsumed RuntimeFilterWritable. Consider 
below flows:
   
   - `RuntimeFilterWritable` is received from Netty via `receiveRuntimeFilter` 
(ref count 1)
   - `retainBuffers` is called in receiveRuntimeFilter (ref count 2)
   - `AddRuntimeFilter` is called. Let's not increment the refCount here and 
just put the filter in the map `rfIdentifier2RFW` (so ref count is still 2)
   - Netty thread will call release (ref count 1)
   - Now there can be 2 cases:
   -- If filter is consumed by RTF operator then on close ref count will be 
decreased (ref count 0)
  For the consumed key RuntimeFilterWritable will be removed from map 
`rfIdentifier2RFW`
   -- If filter is not consumed then it will still be present in the map and 
`closeReceivedRFWs` will release the buffer (ref count 0)
   
   In another case:
   - `RuntimeFilterWritable` buffers are created by HashJoin operator (ref 
count 1)
   - `addRuntimeFilter` is called by HashJoin operator thread if sendToForeman 
is set to false. (ref count 1)
   -  Now there can be 2 cases:
   -- If filter is consumed by RTF operator then on close ref count will be 
decreased (ref count 0)
  For the consumed key RuntimeFilterWritable will be removed from map 
`rfIdentifier2RFW`
   -- If filter is not consumed then it will still be present in the map and 
`closeReceivedRFWs` will release the buffer (ref count 0)
   
   Based on above we can also get rid of `rfIdentifier2fetched` but has to be 
extra careful when looking for RuntimeFilterWritable in `rfIdentifier2RFW`. We 
have to call `containsKey()` to check first in `getRuntimeFilter` rather than 
`get` on `rfIdentifier2RFW`. Since the map allows null values get call can 
create a new key with null values. OR just implement `closeReceivedRFWs` to 
have check for null values.
   
   ```
   private void closeReceivedRFWs() {
   for (RuntimeFilterWritable runtimeFilterWritable : 
rfIdentifier2RFW.values()){
  if (runtimeFilterWritable == null) {
   continue;
  }
  long rfIdentifier = 
runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier();
  runtimeFilterWritable.close();
   }
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-13 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233261620
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,217 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  private Future future;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
 AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if (running.get()) {
-  try {
-aggregatedRFLock.lock();
-if (containOne()) {
-  boolean same = aggregated.equals(runtimeFilterWritable);
-  if (!same) {
-// This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-// share the same FragmentContext.
-

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-12 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r232866925
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) {
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map> 
constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map rfIdentifier2RFW = new 
ConcurrentHashMap<>();
+  private Map rfIdentifier2fetched = new ConcurrentHashMap<>();
 
 Review comment:
   The information which `rfIdentifier2fetched` map is storing can be 
implicitly derived using first map `rfIdentifier2RFW`. Once the 
`RuntimeFilterWritable` is consumed just remove it from first map. Then in 
`closeReceivedRFWs` just call `close` on the `RuntimeFilterWritable` which is 
present.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-12 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r232881982
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -17,206 +17,217 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probeScanEps = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
 
-  private Future future;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map joinMjId2Stopwatch = new HashMap<>();
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+this.drillbitContext = drillbitContext;
+this.sendingAccountor = sendingAccountor;
 AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-future = executorService.submit(asyncAggregateWorker);
+drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-if (running.get()) {
-  try {
-aggregatedRFLock.lock();
-if (containOne()) {
-  boolean same = aggregated.equals(runtimeFilterWritable);
-  if (!same) {
-// This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-// share the same FragmentContext.
-

[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-11 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r232526727
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -58,9 +55,14 @@
 
   private Set toAddRuntimeFilter = new HashSet<>();
 
+  private Multimap probeSideScan2hj = 
HashMultimap.create();
+
   private double fpp;
+
   private int bloomFilterMaxSizeInBytesDef;
 
+  private static final AtomicLong atomicLong = new AtomicLong();
 
 Review comment:
   please rename this variable to something like `rtfIdCounter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-11 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r232529229
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -100,8 +102,18 @@ public Prel visitJoin(JoinPrel prel, Void value) throws 
RuntimeException {
   @Override
   public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException {
 if (toAddRuntimeFilter.contains(prel)) {
-  //Spawn a fresh RuntimeFilterPrel over the previous identified probe 
side scan node.
-  RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel);
+  //Spawn a fresh RuntimeFilterPrel over the previous identified probe 
side scan node or a runtime filter node.
+  Collection hashJoinPrels = probeSideScan2hj.get(prel);
+  RuntimeFilterPrel runtimeFilterPrel = null;
+  for (HashJoinPrel hashJoinPrel : hashJoinPrels) {
+long identifier = atomicLong.incrementAndGet();
+
hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier);
+if (runtimeFilterPrel == null) {
+  runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier);
+} else {
+  runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, 
identifier);
 
 Review comment:
   in this case it will generate 2 RuntimeFilter's stacked on top of each 
other. Shouldn't we instead combine the 2 RTF into single RTF with bloomFilter 
aggregation using some AND operation ? Example of case where this may happen:
   
 HJ
 /  \
  HJ  Scan
  /  \
Scan Scan


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-11 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r232526049
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
 ##
 @@ -58,12 +60,15 @@ public void sendOut(List bloomFilters, 
List probeFields, bo
 for (String probeFiled : probeFields) {
   builder.addProbeFields(probeFiled);
 }
+boolean comingFromHj = sendToForeman ? false : true;
 
 Review comment:
   It looks like `comingFromHj` is basically `!sendToForeman`  and 
`sendToForeman` is already been set in `toForeman` field of 
`RuntimeFilterBDef`. So `comingFromHj` field is redundant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-02 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230527544
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -362,12 +368,35 @@ public boolean isUserAuthenticationEnabled() {
 
   @Override
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-this.runtimeFilterSink.aggregate(runtimeFilter);
+this.runtimeFilterWritable = runtimeFilter;
+if (enableRFWaiting) {
+  condition.signal();
+}
+  }
+
+  @Override
+  public RuntimeFilterWritable getRuntimeFilter() {
+return runtimeFilterWritable;
   }
 
   @Override
-  public RuntimeFilterSink getRuntimeFilterSink() {
-return runtimeFilterSink;
+  public RuntimeFilterWritable getRuntimeFilter(long maxWaitTime, TimeUnit 
timeUnit) {
+if (runtimeFilterWritable != null) {
+  return runtimeFilterWritable;
+}
+if (enableRFWaiting) {
+  lock.lock();
+  try {
+if (runtimeFilterWritable != null) {
+  condition.await(maxWaitTime, timeUnit);
+}
+  } catch (InterruptedException e) {
+logger.debug("Condition was interrupted", e);
+  } finally {
+lock.unlock();
+  }
+}
+return runtimeFilterWritable;
 
 Review comment:
   ```
   lockForRF.lock();
   try {
  if (runtimeFilterWritable != null) {
  return runtimeFilterWritable;
  }
  if (enableRFWaiting) {
  conditionForRF.await(maxWaitTime, timeUnit);
  }
  return runtimeFilterWritable;
   } catch (InterruptedException ex) {
  logger.debug("Conditional wait for RF was interrupted", ex);
   } finally {
  lockForRF.unlock();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-02 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230515093
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MurmurHash3.java
 ##
 @@ -20,7 +20,6 @@
 import io.netty.buffer.DrillBuf;
 import io.netty.util.internal.PlatformDependent;
 
-
 
 Review comment:
   Still seeing this file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-02 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230519811
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -115,6 +117,10 @@
   private final BufferManager bufferManager;
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
+  private boolean enableRuntimeFilter;
+  private boolean enableRFWaiting;
+  private Lock lock;
+  private Condition condition;
 
 Review comment:
   please consider changing name of variable to `lockForRF` and `conditionForRF`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-02 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230520575
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##
 @@ -362,12 +368,35 @@ public boolean isUserAuthenticationEnabled() {
 
   @Override
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-this.runtimeFilterSink.aggregate(runtimeFilter);
+this.runtimeFilterWritable = runtimeFilter;
+if (enableRFWaiting) {
+  condition.signal();
+}
+  }
+
+  @Override
+  public RuntimeFilterWritable getRuntimeFilter() {
+return runtimeFilterWritable;
 
 Review comment:
   ```
   lockForRF.lock();
   try {
  return runtimeFilterWritable;
   } finally {
  lockForRF.unlock();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-02 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230527876
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
 ##
 @@ -159,18 +158,22 @@ BufferAllocator getNewChildAllocator(final String 
operatorName,
 
   @Override
   void close();
-
-  /**
-   * @return
-   */
-  RuntimeFilterSink getRuntimeFilterSink();
-
   /**
* add a RuntimeFilter when the RuntimeFilter receiver belongs to the same 
MinorFragment
* @param runtimeFilter
*/
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
+  public RuntimeFilterWritable getRuntimeFilter();
+
+  /**
+   * get the RuntimeFilter with a blocking wait, if the waiting option is 
enabled
+   * @param maxWaitTime
+   * @param timeUnit
+   * @return
 
 Review comment:
   add `description` for `return` tag otherwise there is warnings for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-01 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230164571
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,33 +128,53 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+int joinMajorId = runtimeFilterB.getMajorFragmentId();
+int buildSideRfNumber;
+RuntimeFilterWritable toAggregated;
+synchronized (this) {
+  buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+  buildSideRfNumber--;
+  joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+  toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+  if (toAggregated == null) {
+toAggregated = srcRuntimeFilterWritable;
+toAggregated.retainBuffers(1);
 
 Review comment:
   makes sense. Thanks for explanation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-01 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230166642
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -263,4 +260,41 @@ public void dump() {
 + "originalRecordCount={}, batchSchema={}]",
 container, sv2, toFilterFields, originalRecordCount, 
incoming.getSchema());
   }
+
+  public enum Metric implements MetricDef {
+FILTERED_ROWS, APPLIED_TIMES;
+
+@Override
+public int metricId() {
+  return ordinal();
+}
+  }
+
+  public void updateStats() {
+stats.setLongStat(Metric.FILTERED_ROWS, filteredRows);
+stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes);
+  }
+
+  private void timedWaiting() {
+if (!enableRFWaiting || waited) {
+  return;
+}
+long startMs = System.currentTimeMillis();
+while (current == null && batchTimes > 0) {
 
 Review comment:
   ahh right. Thanks for explanation. Please add a comment for it that 
downstream HashJoinBatch prefetches first batch from both sides in buildSchema 
phase hence waiting is done post that phase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-11-01 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r230165128
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,33 +128,53 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+int joinMajorId = runtimeFilterB.getMajorFragmentId();
+int buildSideRfNumber;
+RuntimeFilterWritable toAggregated;
+synchronized (this) {
+  buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+  buildSideRfNumber--;
+  joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+  toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+  if (toAggregated == null) {
+toAggregated = srcRuntimeFilterWritable;
+toAggregated.retainBuffers(1);
+  } else {
+toAggregated.aggregate(srcRuntimeFilterWritable);
+  }
+  joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+}
+if (buildSideRfNumber == 0) {
 
 Review comment:
   Yes makes sense. Somehow I missed that `buildSideRfNumber` is a local 
variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229909546
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -103,6 +103,27 @@ public RuntimeFilterWritable duplicate(BufferAllocator 
bufferAllocator) {
 return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
+  public void retainBuffers(final int increment) {
+if (increment <= 0) {
+  return;
+}
+for (final DrillBuf buf : data) {
+  buf.retain(increment);
+}
+  }
+
+  public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator 
bufferAllocator) {
 
 Review comment:
   Please add a TODO comment saying: Not used currently because of  
reference the JIRA number which you will create with explanation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229765087
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -134,8 +142,15 @@ private RuntimeFilterDef 
generateRuntimeFilter(HashJoinPrel hashJoinPrel) {
 
 List bloomFilterDefs = new ArrayList<>();
 //find the possible left scan node of the left join key
-GroupScan groupScan = null;
+ScanPrel probeSideScanPrel = null;
 RelNode left = hashJoinPrel.getLeft();
+RelNode right = hashJoinPrel.getRight();
+ExchangePrel exchangePrel = findRightExchangePrel(right);
+if (exchangePrel == null) {
+  //Does not support the single fragment mode ,that is the right build side
+  //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+  return null;
+}
 List leftFields = left.getRowType().getFieldNames();
 List leftKeys = hashJoinPrel.getLeftKeys();
 RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
 
 Review comment:
   Yes that was the point why ScanPrel is found for all left keys. Loop should 
break once we found a ScanPrel for any left key.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229909442
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -115,7 +98,8 @@ public void aggregate(RuntimeFilterWritable 
runtimeFilterWritable) {
   public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
 try {
   aggregatedRFLock.lock();
-  return aggregated.duplicate(bufferAllocator);
+  RuntimeFilterWritable duplicated = aggregated.duplicate(bufferAllocator);
 
 Review comment:
   This class is not required anymore. It's only used in tests where we should 
replace the reference with correct implementation. Basically update the test 
implementation of [this 
method](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java#L172)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229761029
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -263,4 +260,41 @@ public void dump() {
 + "originalRecordCount={}, batchSchema={}]",
 container, sv2, toFilterFields, originalRecordCount, 
incoming.getSchema());
   }
+
+  public enum Metric implements MetricDef {
+FILTERED_ROWS, APPLIED_TIMES;
+
+@Override
+public int metricId() {
+  return ordinal();
+}
+  }
+
+  public void updateStats() {
+stats.setLongStat(Metric.FILTERED_ROWS, filteredRows);
+stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes);
+  }
+
+  private void timedWaiting() {
+if (!enableRFWaiting || waited) {
+  return;
+}
+long startMs = System.currentTimeMillis();
+while (current == null && batchTimes > 0) {
 
 Review comment:
   What's the reason of waiting after processing first batch ? Why not wait 
just on condition **(current==null)** ? 
   Also it looks more like the design of this wait should be done using a 
conditional variable. Where FragmentContext will signal the conditional 
variable once Filter is set and this minor fragment thread will wait for 
timeout or signal on that conditional variable. See 
[here](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html#await-long-java.util.concurrent.TimeUnit-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229908821
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   Hmm initially I didn't looked from close() call perspective. But even with 
respect to that it doesn't look like there will be any lost Ack case specially 
just with the change for transfer the buffer. It looks like more of a race 
condition somewhere to me. Can you open a JIRA for this issue - that we should 
transfer the buffers and look into about the hang issue ? So that post this PR 
we can look into that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229909632
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
 ##
 @@ -310,6 +310,11 @@ public void addRuntimeFilter(RuntimeFilterWritable 
runtimeFilter) {
   this.runtimeFilterSink.aggregate(runtimeFilter);
 
 Review comment:
   Update this method as RuntimeFilter Sink is not used anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229908466
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,33 +128,53 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+int joinMajorId = runtimeFilterB.getMajorFragmentId();
+int buildSideRfNumber;
+RuntimeFilterWritable toAggregated;
+synchronized (this) {
+  buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+  buildSideRfNumber--;
+  joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+  toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+  if (toAggregated == null) {
+toAggregated = srcRuntimeFilterWritable;
+toAggregated.retainBuffers(1);
 
 Review comment:
   why we need to `retainBuffers` here again ? Caller is already retaining this 
buffer once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229763641
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -1222,7 +1222,7 @@ public HashJoinBatch(HashJoinPOP popConfig, 
FragmentContext context,
 RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
   "configured output batch size: %d", configuredBatchSize);
 
-enableRuntimeFilter = 
context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+enableRuntimeFilter = 
context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && 
popConfig.getRuntimeFilterDef() != null;
 
 Review comment:
   Is this an error condition when the feature is `enabled` but 
`runtimeFilterDef` is not present in popConfig ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229754301
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MurmurHash3.java
 ##
 @@ -271,6 +270,5 @@ public static int hash32(double val, long seed) {
   public static int hash32(int start, int end, DrillBuf buffer, int seed) {
 return murmur3_32(start, end, buffer, seed);
   }
-
 }
 
 Review comment:
   please remove this file from your commits. there is only extra space added 
here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229906462
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,33 +128,53 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+int joinMajorId = runtimeFilterB.getMajorFragmentId();
+int buildSideRfNumber;
+RuntimeFilterWritable toAggregated;
+synchronized (this) {
+  buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+  buildSideRfNumber--;
+  joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+  toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+  if (toAggregated == null) {
+toAggregated = srcRuntimeFilterWritable;
+toAggregated.retainBuffers(1);
+  } else {
+toAggregated.aggregate(srcRuntimeFilterWritable);
+  }
+  joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+}
+if (buildSideRfNumber == 0) {
 
 Review comment:
   this check needs to happen in `synchronized` block as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-31 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229908076
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,33 +128,53 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+int joinMajorId = runtimeFilterB.getMajorFragmentId();
+int buildSideRfNumber;
+RuntimeFilterWritable toAggregated;
+synchronized (this) {
+  buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+  buildSideRfNumber--;
+  joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+  toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+  if (toAggregated == null) {
+toAggregated = srcRuntimeFilterWritable;
+toAggregated.retainBuffers(1);
+  } else {
+toAggregated.aggregate(srcRuntimeFilterWritable);
+  }
+  joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+}
+if (buildSideRfNumber == 0) {
+  joinMjId2AggregatedRF.remove(joinMajorId);
+  route(toAggregated);
+}
   }
 
-
-  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable 
srcRuntimeFilterWritable) {
+  private void route(RuntimeFilterWritable srcRuntimeFilterWritable) {
 BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
 int joinMajorId = runtimeFilterB.getMajorFragmentId();
 UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
 List probeFields = runtimeFilterB.getProbeFieldsList();
+List sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
 DrillBuf[] data = srcRuntimeFilterWritable.getData();
-List scanNodeEps = 
joinMjId2probdeScanEps.get(joinMajorId);
+List scanNodeEps = 
joinMjId2probeScanEps.get(joinMajorId);
+int scanNodeSize = scanNodeEps.size();
+srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
 int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
 for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
   BitData.RuntimeFilterBDef.Builder builder = 
BitData.RuntimeFilterBDef.newBuilder();
   for (String probeField : probeFields) {
 builder.addProbeFields(probeField);
   }
-  BitData.RuntimeFilterBDef runtimeFilterBDef = builder
-.setQueryId(queryId)
-.setMajorFragmentId(scanNodeMjId)
-.setMinorFragmentId(minorId)
-.build();
+  BitData.RuntimeFilterBDef runtimeFilterBDef = 
builder.setQueryId(queryId).setMajorFragmentId(scanNodeMjId).setMinorFragmentId(minorId).setToForeman(false).addAllBloomFilterSizeInBytes(sizeInBytes).build();
 
 Review comment:
   please fix the indentation here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-30 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229501155
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   I looked into the code. It's weird that you are seeing data tunnel being 
hanged while sending runtimeFilter since for runtime filter it doesn't look 
like we are using throttling listener based on semaphore count. If ACK is not 
received the DataTunnel shouldn't hang while sending RuntimeFilter. It will 
only hang while sending the DataBatches.
   
   DataTunnel is created here for sending RuntimeFilter: 
https://github.com/apache/drill/pull/1504/files#diff-86eb65fdf93f77f95838f885752e660cR191
   In `tunnel.sendRuntimeFilter` I don't see anywhere `semaphore.acquire` is 
called which will result in hang if Ack is not received. 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java#L94


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-29 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229143935
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   I will look where the issue is with transfer code since wire protocol should 
be same in both the case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-29 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229143935
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   I will look where the issue is with transfer code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-29 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229110793
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   When you say it doesn't work what exactly is the issue with this approach ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226783106
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,22 +121,20 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
-  }
-
-
-  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable 
srcRuntimeFilterWritable) {
+  public void route(RuntimeFilterWritable srcRuntimeFilterWritable) {
 BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
 int joinMajorId = runtimeFilterB.getMajorFragmentId();
 UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
 List probeFields = runtimeFilterB.getProbeFieldsList();
+List sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
 DrillBuf[] data = srcRuntimeFilterWritable.getData();
 List scanNodeEps = 
joinMjId2probdeScanEps.get(joinMajorId);
 
 Review comment:
   joinMjId2probdeScanEps -> joinMjId2**probe**ScanEps


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226809347
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -303,10 +332,28 @@ private Wrapper findPhysicalOpContainer(Wrapper wrapper, 
PhysicalOperator op) {
 return opContainer;
   }
 }
-//should not be here
-throw new IllegalStateException(String.format("No valid Wrapper found for 
physicalOperator with id=%d", op.getOperatorId()));
+return null;
+  }
+
+  private Wrapper findRuntimeFilterContainer(Wrapper wrapper, long 
runtimeFilterIdentifier) {
+boolean contain = containsRuntimeFilterPhysicalOperator(wrapper, 
runtimeFilterIdentifier);
+if (contain) {
+  return wrapper;
+}
+List dependencies = wrapper.getFragmentDependencies();
+if (CollectionUtils.isEmpty(dependencies)) {
+  return null;
+}
+for (Wrapper dependencyWrapper : dependencies) {
+  Wrapper opContainer = findRuntimeFilterContainer(dependencyWrapper, 
runtimeFilterIdentifier);
+  if (opContainer != null) {
+return opContainer;
+  }
+}
+return null;
 
 Review comment:
   `findRuntimeFilterContainer` and `findPhysicalOpContainer` are doing same 
thing except they are using different visitor. We can refactor this code as 
below then caller can call with appropriate visitor.
   ```suggestion
   private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalVisitor 
visitor) {
  boolean contain = containsPhysicalOperator(visitor);
   if (contain) {
 return wrapper;
   }
   List dependencies = wrapper.getFragmentDependencies();
   if (CollectionUtils.isEmpty(dependencies)) {
 return null;
   }
   for (Wrapper dependencyWrapper : dependencies) {
 Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, 
visitor);
 if (opContainer != null) {
   return opContainer;
 }
   }
   return null;
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226784282
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -125,22 +121,20 @@ public void waitForComplete() {
 
   /**
* This method is passively invoked by receiving a runtime filter from the 
network
-   * @param runtimeFilterWritable
+   * @param srcRuntimeFilterWritable
*/
-  public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
-  }
-
-
-  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable 
srcRuntimeFilterWritable) {
+  public void route(RuntimeFilterWritable srcRuntimeFilterWritable) {
 
 Review comment:
   I was thinking may be we can open an improvement JIRA to do this routing 
directly from HashJoin fragment to probe side RTF operator fragment. We can 
pass this routing information specific to each HashJoin and it's corresponding 
probe side scan while scheduling HashJoin MinorFragment. This will help us to 
save one extra network hop. In future when there is mechanism to transfer data 
batch between 2 local minor fragments without network transfer, the same 
optimization will kick in for RuntimeFilter and will avoid any network hop.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226778025
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final 
FragmentHandle handle) {
   return runningFragments.get(handle);
 }
 
+/**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
 public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
   BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
   boolean toForeman = runtimeFilterDef.getToForeman();
   QueryId queryId = runtimeFilterDef.getQueryId();
   String queryIdStr = QueryIdHelper.getQueryId(queryId);
+  runtimeFilter.retainBuffers(1);
 
 Review comment:
   We should to transfer and then retain for the buffers (dbody) received over 
wire. Transfer will help to account for memory used by RuntimeFilter in 
FragmentContext allocator.  See 
[here](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java#L65)
 for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226731921
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -134,8 +142,15 @@ private RuntimeFilterDef 
generateRuntimeFilter(HashJoinPrel hashJoinPrel) {
 
 List bloomFilterDefs = new ArrayList<>();
 //find the possible left scan node of the left join key
-GroupScan groupScan = null;
+ScanPrel probeSideScanPrel = null;
 RelNode left = hashJoinPrel.getLeft();
+RelNode right = hashJoinPrel.getRight();
+ExchangePrel exchangePrel = findRightExchangePrel(right);
+if (exchangePrel == null) {
+  //Does not support the single fragment mode ,that is the right build side
+  //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+  return null;
+}
 List leftFields = left.getRowType().getFieldNames();
 List leftKeys = hashJoinPrel.getLeftKeys();
 RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
 
 Review comment:
   For line 160 I wanted to understand why are we trying to get ScanPrel node 
for each left key ? Won't all the left keys be coming from same ScanPrel 
considering we are not handling blocking operator scenario ? Basically I am 
trying to understand when there can be 2 source ScanPrel for set of left keys 
to a HashJoinPrel. 
   Also even if that is the case where we can have multiple source scan prel's 
then on line 188 we are only using last known ScanPrel node


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

2018-10-19 Thread GitBox
sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r226721691
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -160,7 +160,7 @@ private void setupHashHelper() {
 if (!runtimeFilterSink.containOne()) {
   return;
 }
-if (runtimeFilterSink.hasFreshOne()) {
+if (runtimeFilterSink.hasFreshOne() || current == null) {
 
 Review comment:
   why **current == null** check is needed here ? By default current is null 
only first time and hasFreshOne() will also return true in that case if a valid 
filter is available.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services