[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236770022 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,250 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); +public class RuntimeFilterSink implements Closeable +{ - private int staleBookId = 0; + private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; + private Map joinMjId2rfNumber; - private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ - private ReentrantLock queueLock = new ReentrantLock(); + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); - private Condition notEmpty = queueLock.newCondition(); + private DrillbitContext drillbitContext; - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private SendingAccountor sendingAccountor; - private BufferAllocator bufferAllocator; + private AsyncAggregateWorker asyncAggregateWorker; - private Future future; + private AtomicBoolean running = new AtomicBoolean(true); private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; -AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; +asyncAggregateWorker = new AsyncAggregateWorker(); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236487447 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) { private final AccountingUserConnection accountingUserConnection; /** Stores constants and their holders by type */ private final Map> constantValueHolderCache; - - private RuntimeFilterSink runtimeFilterSink; + private Map rfIdentifier2RFW = new ConcurrentHashMap<>(); + private Map rfIdentifier2fetched = new ConcurrentHashMap<>(); Review comment: RuntimeFilterRecordBatch should not call get multiple times once it has received a valid RTF. Anyways I guess that's not a big deal and can be refactored in future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236397662 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -40,23 +41,18 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; /** * This sink receives the RuntimeFilters from the netty thread, * aggregates them in an async thread, broadcast the final aggregated * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink +public class RuntimeFilterSink implements Cloneable Review comment: implements `Closeable`. Also there are race condition similar to ones pointed out in original implementation of this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236406316 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -74,32 +70,43 @@ private SendingAccountor sendingAccountor; + private AsyncAggregateWorker asyncAggregateWorker; + + private AtomicBoolean running = new AtomicBoolean(true); + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) { this.drillbitContext = drillbitContext; this.sendingAccountor = sendingAccountor; -AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -drillbitContext.getExecutor().submit(asyncAggregateWorker); } public void add(RuntimeFilterWritable runtimeFilterWritable) { +if (asyncAggregateWorker == null) { + asyncAggregateWorker = new AsyncAggregateWorker(); + drillbitContext.getExecutor().submit(asyncAggregateWorker); +} runtimeFilterWritable.retainBuffers(1); int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId(); if (joinMjId2Stopwatch.get(joinMjId) == null) { Stopwatch stopwatch = Stopwatch.createStarted(); joinMjId2Stopwatch.put(joinMjId, stopwatch); } -queueLock.lock(); -try { +synchronized (rfQueue) { rfQueue.add(runtimeFilterWritable); Review comment: should check `running` status before adding in queue. In cases when running is false the thread should `close` the received filter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236396758 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -1013,9 +1041,9 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException { } if (spilledState.isFirstCycle() && enableRuntimeFilter) { - if (bloomFilters.size() > 0) { + if (bloomFilter2buildId.size() > 0) { int hashJoinOpId = this.popConfig.getOperatorId(); -runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId); +runtimeFilterReporter.sendOut(new ArrayList<>(bloomFilter2buildId.keySet()), probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId); Review comment: We create `BloomFilter` for each build side field. But here when sending out the filters for all fields the order of `bloomFilter` and `probeFields` in their respective list can be different. On the receiving end first message can have bloomFilter with probe fields whereas second message can have and probe fields still . Hence `aggregator` will aggregate and which can result in wrong results. Same is happening in `applyRuntimeFilter` in RTF operator where I guess it's assumed order of probe fields and bloomfilter list is consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236405542 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -74,32 +70,43 @@ private SendingAccountor sendingAccountor; + private AsyncAggregateWorker asyncAggregateWorker; + + private AtomicBoolean running = new AtomicBoolean(true); + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) { this.drillbitContext = drillbitContext; this.sendingAccountor = sendingAccountor; -AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -drillbitContext.getExecutor().submit(asyncAggregateWorker); } public void add(RuntimeFilterWritable runtimeFilterWritable) { +if (asyncAggregateWorker == null) { + asyncAggregateWorker = new AsyncAggregateWorker(); + drillbitContext.getExecutor().submit(asyncAggregateWorker); +} Review comment: Should be done once in constructor. Putting here has issues such that multiple netty threads can create multiple `asyncAggregateWorker` thread and leak all except the last set one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236378230 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -76,6 +77,16 @@ public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, Frag enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val; maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val; this.rfIdentifier = pop.getIdentifier(); +/** +int majId = context.getHandle().getMajorFragmentId(); +int operatorId = pop.getOperatorId(); +if (majId == 5 && operatorId == 4) { + passed = true; +} +if (majId == 3 && operatorId == 13) { + passed = true; +} + */ Review comment: please remove the commented block and `passed` variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236396536 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException enableRuntimeFilter = false; return; } +RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); +List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); +for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + String buildField = bloomFilterDef.getBuildField(); + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { +missingField = true; +break; + } + int fieldId = typedFieldId.getFieldIds()[0]; + String probeField = bloomFilterDef.getProbeField(); + probeFields.add(probeField); + bloomFilterDef2buildId.put(bloomFilterDef, fieldId); +} +if (missingField) { + logger.info("As some build side joint key fields not found, runtime filter was disabled"); + enableRuntimeFilter = false; + return; +} ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context); try { hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds); } catch (Exception e) { throw new SchemaChangeException("Failed to construct a field's hash64 dynamic codes", e); } +if (runtimeFilterReporter == null) { + runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); Review comment: With this the `initializeRuntimeFilter` will always return without actually creating BloomFilters. Is this initialization required here ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236407395 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -202,32 +209,36 @@ public void setJoinMjId2ScanMjId(Map joinMjId2ScanMjId) @Override public void run() { - while (joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty()) { -queueLock.lock(); -RuntimeFilterWritable toAggregate = rfQueue.poll(); -if (toAggregate == null) { - try { -notEmpty.await(); - } - catch (InterruptedException e) { -logger.error("RFW_Aggregator thread being interrupted", e); - } - finally { -queueLock.unlock(); - } -} else { - queueLock.unlock(); + while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) { +RuntimeFilterWritable toAggregate = null; +synchronized (rfQueue) { try { -aggregate(toAggregate); - } - catch (Exception e) { -logger.error("fail to aggregate or route the RFW", e); -throw new RuntimeException(e); - } - finally { -toAggregate.close(); +toAggregate = rfQueue.poll(); +while (toAggregate == null && running.get()) { + rfQueue.wait(); + toAggregate = rfQueue.poll(); +} + } catch (InterruptedException ex) { +logger.error("RFW_Aggregator thread being interrupted", ex); +continue; } } +// perform aggregate outside the sync block. +try { + aggregate(toAggregate); +} catch (Exception ex) { + logger.error("Failed to aggregate or route the RFW", ex); + throw new DrillRuntimeException(ex); +} finally { + toAggregate.close(); Review comment: check if `toAggregate != null` before close. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236387141 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException enableRuntimeFilter = false; return; } +RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); +List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); +for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + String buildField = bloomFilterDef.getBuildField(); + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { +missingField = true; +break; + } + int fieldId = typedFieldId.getFieldIds()[0]; + String probeField = bloomFilterDef.getProbeField(); + probeFields.add(probeField); + bloomFilterDef2buildId.put(bloomFilterDef, fieldId); +} +if (missingField) { + logger.info("As some build side joint key fields not found, runtime filter was disabled"); Review comment: build side **joint** key ---> build side **join** key This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236407149 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -202,32 +209,36 @@ public void setJoinMjId2ScanMjId(Map joinMjId2ScanMjId) @Override public void run() { - while (joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty()) { -queueLock.lock(); -RuntimeFilterWritable toAggregate = rfQueue.poll(); -if (toAggregate == null) { - try { -notEmpty.await(); - } - catch (InterruptedException e) { -logger.error("RFW_Aggregator thread being interrupted", e); - } - finally { -queueLock.unlock(); - } -} else { - queueLock.unlock(); + while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) { +RuntimeFilterWritable toAggregate = null; +synchronized (rfQueue) { try { -aggregate(toAggregate); - } - catch (Exception e) { -logger.error("fail to aggregate or route the RFW", e); -throw new RuntimeException(e); - } - finally { -toAggregate.close(); +toAggregate = rfQueue.poll(); +while (toAggregate == null && running.get()) { + rfQueue.wait(); + toAggregate = rfQueue.poll(); +} + } catch (InterruptedException ex) { +logger.error("RFW_Aggregator thread being interrupted", ex); +continue; } } +// perform aggregate outside the sync block. +try { + aggregate(toAggregate); Review comment: check if `toAggregate != null` and then only call `aggregate(toAggregate)`. This can happen for case when `running` is false. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r236396402 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -747,12 +751,35 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException enableRuntimeFilter = false; return; } +RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); Review comment: `runtimeFilterDef` can be null here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r233325858 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,218 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); - - private int staleBookId = 0; - - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; +public class RuntimeFilterSink +{ private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); - - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ private ReentrantLock queueLock = new ReentrantLock(); private Condition notEmpty = queueLock.newCondition(); - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private Map joinMjId2rfNumber; + + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); + + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - private BufferAllocator bufferAllocator; + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); - private Future future; + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor; private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if (running.get()) { - try { -aggregatedRFLock.lock(); -if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { -// This is to solve the only one fragment case that two RuntimeFilterRecordBatchs -// share the same FragmentContext. -
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r233261620 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,217 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); - - private int staleBookId = 0; - - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; +public class RuntimeFilterSink +{ private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); - - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ private ReentrantLock queueLock = new ReentrantLock(); private Condition notEmpty = queueLock.newCondition(); - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private Map joinMjId2rfNumber; + + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); - private BufferAllocator bufferAllocator; + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - private Future future; + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor; private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if (running.get()) { - try { -aggregatedRFLock.lock(); -if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { -// This is to solve the only one fragment case that two RuntimeFilterRecordBatchs -// share the same FragmentContext. -
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r233262063 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,218 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); - - private int staleBookId = 0; - - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; +public class RuntimeFilterSink +{ private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); - - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ private ReentrantLock queueLock = new ReentrantLock(); private Condition notEmpty = queueLock.newCondition(); - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private Map joinMjId2rfNumber; + + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); + + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - private BufferAllocator bufferAllocator; + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); - private Future future; + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor; private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if (running.get()) { - try { -aggregatedRFLock.lock(); -if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { -// This is to solve the only one fragment case that two RuntimeFilterRecordBatchs -// share the same FragmentContext. -
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r233207751 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) { private final AccountingUserConnection accountingUserConnection; /** Stores constants and their holders by type */ private final Map> constantValueHolderCache; - - private RuntimeFilterSink runtimeFilterSink; + private Map rfIdentifier2RFW = new ConcurrentHashMap<>(); + private Map rfIdentifier2fetched = new ConcurrentHashMap<>(); Review comment: The extra decrease is because of this extra [retainBuffers](https://github.com/apache/drill/pull/1504/files#diff-11d71582cb7541a6ace7d9a1d7072c40R374) call. I think we can get rid of that `retainBuffer` call and then we don't have to call `release` twice for unconsumed RuntimeFilterWritable. Consider below flows: - `RuntimeFilterWritable` is received from Netty via `receiveRuntimeFilter` (ref count 1) - `retainBuffers` is called in receiveRuntimeFilter (ref count 2) - `AddRuntimeFilter` is called. Let's not increment the refCount here and just put the filter in the map `rfIdentifier2RFW` (so ref count is still 2) - Netty thread will call release (ref count 1) - Now there can be 2 cases: -- If filter is consumed by RTF operator then on close ref count will be decreased (ref count 0) For the consumed key RuntimeFilterWritable will be removed from map `rfIdentifier2RFW` -- If filter is not consumed then it will still be present in the map and `closeReceivedRFWs` will release the buffer (ref count 0) In another case: - `RuntimeFilterWritable` buffers are created by HashJoin operator (ref count 1) - `addRuntimeFilter` is called by HashJoin operator thread if sendToForeman is set to false. (ref count 1) - Now there can be 2 cases: -- If filter is consumed by RTF operator then on close ref count will be decreased (ref count 0) For the consumed key RuntimeFilterWritable will be removed from map `rfIdentifier2RFW` -- If filter is not consumed then it will still be present in the map and `closeReceivedRFWs` will release the buffer (ref count 0) Based on above we can also get rid of `rfIdentifier2fetched` but has to be extra careful when looking for RuntimeFilterWritable in `rfIdentifier2RFW`. We have to call `containsKey()` to check first in `getRuntimeFilter` rather than `get` on `rfIdentifier2RFW`. Since the map allows null values get call can create a new key with null values. OR just implement `closeReceivedRFWs` to have check for null values. ``` private void closeReceivedRFWs() { for (RuntimeFilterWritable runtimeFilterWritable : rfIdentifier2RFW.values()){ if (runtimeFilterWritable == null) { continue; } long rfIdentifier = runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier(); runtimeFilterWritable.close(); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r233261620 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,217 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); - - private int staleBookId = 0; - - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; +public class RuntimeFilterSink +{ private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); - - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ private ReentrantLock queueLock = new ReentrantLock(); private Condition notEmpty = queueLock.newCondition(); - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private Map joinMjId2rfNumber; + + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); - private BufferAllocator bufferAllocator; + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - private Future future; + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor; private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if (running.get()) { - try { -aggregatedRFLock.lock(); -if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { -// This is to solve the only one fragment case that two RuntimeFilterRecordBatchs -// share the same FragmentContext. -
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r232866925 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) { private final AccountingUserConnection accountingUserConnection; /** Stores constants and their holders by type */ private final Map> constantValueHolderCache; - - private RuntimeFilterSink runtimeFilterSink; + private Map rfIdentifier2RFW = new ConcurrentHashMap<>(); + private Map rfIdentifier2fetched = new ConcurrentHashMap<>(); Review comment: The information which `rfIdentifier2fetched` map is storing can be implicitly derived using first map `rfIdentifier2RFW`. Once the `RuntimeFilterWritable` is consumed just remove it from first map. Then in `closeReceivedRFWs` just call `close` on the `RuntimeFilterWritable` which is present. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r232881982 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -17,206 +17,217 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); - - private int staleBookId = 0; - - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; +public class RuntimeFilterSink +{ private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); - - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ private ReentrantLock queueLock = new ReentrantLock(); private Condition notEmpty = queueLock.newCondition(); - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private Map joinMjId2rfNumber; + + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probeScanEps = new HashMap<>(); - private BufferAllocator bufferAllocator; + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); - private Future future; + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map joinMjId2Stopwatch = new HashMap<>(); + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor; private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { -this.bufferAllocator = bufferAllocator; + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { +this.drillbitContext = drillbitContext; +this.sendingAccountor = sendingAccountor; AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); -future = executorService.submit(asyncAggregateWorker); +drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -if (running.get()) { - try { -aggregatedRFLock.lock(); -if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { -// This is to solve the only one fragment case that two RuntimeFilterRecordBatchs -// share the same FragmentContext. -
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r232526727 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -58,9 +55,14 @@ private Set toAddRuntimeFilter = new HashSet<>(); + private Multimap probeSideScan2hj = HashMultimap.create(); + private double fpp; + private int bloomFilterMaxSizeInBytesDef; + private static final AtomicLong atomicLong = new AtomicLong(); Review comment: please rename this variable to something like `rtfIdCounter`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r232529229 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -100,8 +102,18 @@ public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { @Override public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException { if (toAddRuntimeFilter.contains(prel)) { - //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node. - RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel); + //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node. + Collection hashJoinPrels = probeSideScan2hj.get(prel); + RuntimeFilterPrel runtimeFilterPrel = null; + for (HashJoinPrel hashJoinPrel : hashJoinPrels) { +long identifier = atomicLong.incrementAndGet(); + hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier); +if (runtimeFilterPrel == null) { + runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier); +} else { + runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier); Review comment: in this case it will generate 2 RuntimeFilter's stacked on top of each other. Shouldn't we instead combine the 2 RTF into single RTF with bloomFilter aggregation using some AND operation ? Example of case where this may happen: HJ / \ HJ Scan / \ Scan Scan This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r232526049 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java ## @@ -58,12 +60,15 @@ public void sendOut(List bloomFilters, List probeFields, bo for (String probeFiled : probeFields) { builder.addProbeFields(probeFiled); } +boolean comingFromHj = sendToForeman ? false : true; Review comment: It looks like `comingFromHj` is basically `!sendToForeman` and `sendToForeman` is already been set in `toForeman` field of `RuntimeFilterBDef`. So `comingFromHj` field is redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230527544 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -362,12 +368,35 @@ public boolean isUserAuthenticationEnabled() { @Override public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { -this.runtimeFilterSink.aggregate(runtimeFilter); +this.runtimeFilterWritable = runtimeFilter; +if (enableRFWaiting) { + condition.signal(); +} + } + + @Override + public RuntimeFilterWritable getRuntimeFilter() { +return runtimeFilterWritable; } @Override - public RuntimeFilterSink getRuntimeFilterSink() { -return runtimeFilterSink; + public RuntimeFilterWritable getRuntimeFilter(long maxWaitTime, TimeUnit timeUnit) { +if (runtimeFilterWritable != null) { + return runtimeFilterWritable; +} +if (enableRFWaiting) { + lock.lock(); + try { +if (runtimeFilterWritable != null) { + condition.await(maxWaitTime, timeUnit); +} + } catch (InterruptedException e) { +logger.debug("Condition was interrupted", e); + } finally { +lock.unlock(); + } +} +return runtimeFilterWritable; Review comment: ``` lockForRF.lock(); try { if (runtimeFilterWritable != null) { return runtimeFilterWritable; } if (enableRFWaiting) { conditionForRF.await(maxWaitTime, timeUnit); } return runtimeFilterWritable; } catch (InterruptedException ex) { logger.debug("Conditional wait for RF was interrupted", ex); } finally { lockForRF.unlock(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230515093 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MurmurHash3.java ## @@ -20,7 +20,6 @@ import io.netty.buffer.DrillBuf; import io.netty.util.internal.PlatformDependent; - Review comment: Still seeing this file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230519811 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -115,6 +117,10 @@ private final BufferManager bufferManager; private ExecutorState executorState; private final ExecutionControls executionControls; + private boolean enableRuntimeFilter; + private boolean enableRFWaiting; + private Lock lock; + private Condition condition; Review comment: please consider changing name of variable to `lockForRF` and `conditionForRF` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230520575 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ## @@ -362,12 +368,35 @@ public boolean isUserAuthenticationEnabled() { @Override public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { -this.runtimeFilterSink.aggregate(runtimeFilter); +this.runtimeFilterWritable = runtimeFilter; +if (enableRFWaiting) { + condition.signal(); +} + } + + @Override + public RuntimeFilterWritable getRuntimeFilter() { +return runtimeFilterWritable; Review comment: ``` lockForRF.lock(); try { return runtimeFilterWritable; } finally { lockForRF.unlock(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230527876 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ## @@ -159,18 +158,22 @@ BufferAllocator getNewChildAllocator(final String operatorName, @Override void close(); - - /** - * @return - */ - RuntimeFilterSink getRuntimeFilterSink(); - /** * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment * @param runtimeFilter */ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter); + public RuntimeFilterWritable getRuntimeFilter(); + + /** + * get the RuntimeFilter with a blocking wait, if the waiting option is enabled + * @param maxWaitTime + * @param timeUnit + * @return Review comment: add `description` for `return` tag otherwise there is warnings for it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230164571 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { +BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); +int joinMajorId = runtimeFilterB.getMajorFragmentId(); +int buildSideRfNumber; +RuntimeFilterWritable toAggregated; +synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { +toAggregated = srcRuntimeFilterWritable; +toAggregated.retainBuffers(1); Review comment: makes sense. Thanks for explanation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230166642 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -263,4 +260,41 @@ public void dump() { + "originalRecordCount={}, batchSchema={}]", container, sv2, toFilterFields, originalRecordCount, incoming.getSchema()); } + + public enum Metric implements MetricDef { +FILTERED_ROWS, APPLIED_TIMES; + +@Override +public int metricId() { + return ordinal(); +} + } + + public void updateStats() { +stats.setLongStat(Metric.FILTERED_ROWS, filteredRows); +stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes); + } + + private void timedWaiting() { +if (!enableRFWaiting || waited) { + return; +} +long startMs = System.currentTimeMillis(); +while (current == null && batchTimes > 0) { Review comment: ahh right. Thanks for explanation. Please add a comment for it that downstream HashJoinBatch prefetches first batch from both sides in buildSchema phase hence waiting is done post that phase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r230165128 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { +BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); +int joinMajorId = runtimeFilterB.getMajorFragmentId(); +int buildSideRfNumber; +RuntimeFilterWritable toAggregated; +synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { +toAggregated = srcRuntimeFilterWritable; +toAggregated.retainBuffers(1); + } else { +toAggregated.aggregate(srcRuntimeFilterWritable); + } + joinMjId2AggregatedRF.put(joinMajorId, toAggregated); +} +if (buildSideRfNumber == 0) { Review comment: Yes makes sense. Somehow I missed that `buildSideRfNumber` is a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229909546 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -103,6 +103,27 @@ public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { return new RuntimeFilterWritable(runtimeFilterBDef, cloned); } + public void retainBuffers(final int increment) { +if (increment <= 0) { + return; +} +for (final DrillBuf buf : data) { + buf.retain(increment); +} + } + + public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) { Review comment: Please add a TODO comment saying: Not used currently because of reference the JIRA number which you will create with explanation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229765087 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -134,8 +142,15 @@ private RuntimeFilterDef generateRuntimeFilter(HashJoinPrel hashJoinPrel) { List bloomFilterDefs = new ArrayList<>(); //find the possible left scan node of the left join key -GroupScan groupScan = null; +ScanPrel probeSideScanPrel = null; RelNode left = hashJoinPrel.getLeft(); +RelNode right = hashJoinPrel.getRight(); +ExchangePrel exchangePrel = findRightExchangePrel(right); +if (exchangePrel == null) { + //Does not support the single fragment mode ,that is the right build side + //can only be BroadcastExchangePrel or HashToRandomExchangePrel + return null; +} List leftFields = left.getRowType().getFieldNames(); List leftKeys = hashJoinPrel.getLeftKeys(); RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery(); Review comment: Yes that was the point why ScanPrel is found for all left keys. Loop should break once we found a ScanPrel for any left key. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229909442 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -115,7 +98,8 @@ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() { try { aggregatedRFLock.lock(); - return aggregated.duplicate(bufferAllocator); + RuntimeFilterWritable duplicated = aggregated.duplicate(bufferAllocator); Review comment: This class is not required anymore. It's only used in tests where we should replace the reference with correct implementation. Basically update the test implementation of [this method](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java#L172) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229761029 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -263,4 +260,41 @@ public void dump() { + "originalRecordCount={}, batchSchema={}]", container, sv2, toFilterFields, originalRecordCount, incoming.getSchema()); } + + public enum Metric implements MetricDef { +FILTERED_ROWS, APPLIED_TIMES; + +@Override +public int metricId() { + return ordinal(); +} + } + + public void updateStats() { +stats.setLongStat(Metric.FILTERED_ROWS, filteredRows); +stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes); + } + + private void timedWaiting() { +if (!enableRFWaiting || waited) { + return; +} +long startMs = System.currentTimeMillis(); +while (current == null && batchTimes > 0) { Review comment: What's the reason of waiting after processing first batch ? Why not wait just on condition **(current==null)** ? Also it looks more like the design of this wait should be done using a conditional variable. Where FragmentContext will signal the conditional variable once Filter is set and this minor fragment thread will wait for timeout or signal on that conditional variable. See [here](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html#await-long-java.util.concurrent.TimeUnit-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229908821 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: Hmm initially I didn't looked from close() call perspective. But even with respect to that it doesn't look like there will be any lost Ack case specially just with the change for transfer the buffer. It looks like more of a race condition somewhere to me. Can you open a JIRA for this issue - that we should transfer the buffers and look into about the hang issue ? So that post this PR we can look into that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229909632 ## File path: exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java ## @@ -310,6 +310,11 @@ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { this.runtimeFilterSink.aggregate(runtimeFilter); Review comment: Update this method as RuntimeFilter Sink is not used anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229908466 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { +BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); +int joinMajorId = runtimeFilterB.getMajorFragmentId(); +int buildSideRfNumber; +RuntimeFilterWritable toAggregated; +synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { +toAggregated = srcRuntimeFilterWritable; +toAggregated.retainBuffers(1); Review comment: why we need to `retainBuffers` here again ? Caller is already retaining this buffer once. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229763641 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -1222,7 +1222,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), "configured output batch size: %d", configuredBatchSize); -enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); +enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null; Review comment: Is this an error condition when the feature is `enabled` but `runtimeFilterDef` is not present in popConfig ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229754301 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MurmurHash3.java ## @@ -271,6 +270,5 @@ public static int hash32(double val, long seed) { public static int hash32(int start, int end, DrillBuf buffer, int seed) { return murmur3_32(start, end, buffer, seed); } - } Review comment: please remove this file from your commits. there is only extra space added here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229906462 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { +BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); +int joinMajorId = runtimeFilterB.getMajorFragmentId(); +int buildSideRfNumber; +RuntimeFilterWritable toAggregated; +synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { +toAggregated = srcRuntimeFilterWritable; +toAggregated.retainBuffers(1); + } else { +toAggregated.aggregate(srcRuntimeFilterWritable); + } + joinMjId2AggregatedRF.put(joinMajorId, toAggregated); +} +if (buildSideRfNumber == 0) { Review comment: this check needs to happen in `synchronized` block as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229908076 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { +BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); +int joinMajorId = runtimeFilterB.getMajorFragmentId(); +int buildSideRfNumber; +RuntimeFilterWritable toAggregated; +synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { +toAggregated = srcRuntimeFilterWritable; +toAggregated.retainBuffers(1); + } else { +toAggregated.aggregate(srcRuntimeFilterWritable); + } + joinMjId2AggregatedRF.put(joinMajorId, toAggregated); +} +if (buildSideRfNumber == 0) { + joinMjId2AggregatedRF.remove(joinMajorId); + route(toAggregated); +} } - - private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { + private void route(RuntimeFilterWritable srcRuntimeFilterWritable) { BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); int joinMajorId = runtimeFilterB.getMajorFragmentId(); UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); List probeFields = runtimeFilterB.getProbeFieldsList(); +List sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList(); DrillBuf[] data = srcRuntimeFilterWritable.getData(); -List scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); +List scanNodeEps = joinMjId2probeScanEps.get(joinMajorId); +int scanNodeSize = scanNodeEps.size(); +srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1); int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder(); for (String probeField : probeFields) { builder.addProbeFields(probeField); } - BitData.RuntimeFilterBDef runtimeFilterBDef = builder -.setQueryId(queryId) -.setMajorFragmentId(scanNodeMjId) -.setMinorFragmentId(minorId) -.build(); + BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId).setMajorFragmentId(scanNodeMjId).setMinorFragmentId(minorId).setToForeman(false).addAllBloomFilterSizeInBytes(sizeInBytes).build(); Review comment: please fix the indentation here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229501155 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: I looked into the code. It's weird that you are seeing data tunnel being hanged while sending runtimeFilter since for runtime filter it doesn't look like we are using throttling listener based on semaphore count. If ACK is not received the DataTunnel shouldn't hang while sending RuntimeFilter. It will only hang while sending the DataBatches. DataTunnel is created here for sending RuntimeFilter: https://github.com/apache/drill/pull/1504/files#diff-86eb65fdf93f77f95838f885752e660cR191 In `tunnel.sendRuntimeFilter` I don't see anywhere `semaphore.acquire` is called which will result in hang if Ack is not received. https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java#L94 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229143935 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: I will look where the issue is with transfer code since wire protocol should be same in both the case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229143935 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: I will look where the issue is with transfer code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229110793 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: When you say it doesn't work what exactly is the issue with this approach ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226783106 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,22 +121,20 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); - } - - - private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { + public void route(RuntimeFilterWritable srcRuntimeFilterWritable) { BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); int joinMajorId = runtimeFilterB.getMajorFragmentId(); UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); List probeFields = runtimeFilterB.getProbeFieldsList(); +List sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList(); DrillBuf[] data = srcRuntimeFilterWritable.getData(); List scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); Review comment: joinMjId2probdeScanEps -> joinMjId2**probe**ScanEps This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226809347 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -303,10 +332,28 @@ private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) { return opContainer; } } -//should not be here -throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId())); +return null; + } + + private Wrapper findRuntimeFilterContainer(Wrapper wrapper, long runtimeFilterIdentifier) { +boolean contain = containsRuntimeFilterPhysicalOperator(wrapper, runtimeFilterIdentifier); +if (contain) { + return wrapper; +} +List dependencies = wrapper.getFragmentDependencies(); +if (CollectionUtils.isEmpty(dependencies)) { + return null; +} +for (Wrapper dependencyWrapper : dependencies) { + Wrapper opContainer = findRuntimeFilterContainer(dependencyWrapper, runtimeFilterIdentifier); + if (opContainer != null) { +return opContainer; + } +} +return null; Review comment: `findRuntimeFilterContainer` and `findPhysicalOpContainer` are doing same thing except they are using different visitor. We can refactor this code as below then caller can call with appropriate visitor. ```suggestion private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalVisitor visitor) { boolean contain = containsPhysicalOperator(visitor); if (contain) { return wrapper; } List dependencies = wrapper.getFragmentDependencies(); if (CollectionUtils.isEmpty(dependencies)) { return null; } for (Wrapper dependencyWrapper : dependencies) { Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, visitor); if (opContainer != null) { return opContainer; } } return null; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226784282 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -125,22 +121,20 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { -broadcastAggregatedRuntimeFilter(runtimeFilterWritable); - } - - - private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { + public void route(RuntimeFilterWritable srcRuntimeFilterWritable) { Review comment: I was thinking may be we can open an improvement JIRA to do this routing directly from HashJoin fragment to probe side RTF operator fragment. We can pass this routing information specific to each HashJoin and it's corresponding probe side scan while scheduling HashJoin MinorFragment. This will help us to save one extra network hop. In future when there is mechanism to transfer data batch between 2 local minor fragments without network transfer, the same optimization will kick in for RuntimeFilter and will avoid any network hop. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226778025 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } +/** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); Review comment: We should to transfer and then retain for the buffers (dbody) received over wire. Transfer will help to account for memory used by RuntimeFilter in FragmentContext allocator. See [here](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java#L65) for example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226731921 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -134,8 +142,15 @@ private RuntimeFilterDef generateRuntimeFilter(HashJoinPrel hashJoinPrel) { List bloomFilterDefs = new ArrayList<>(); //find the possible left scan node of the left join key -GroupScan groupScan = null; +ScanPrel probeSideScanPrel = null; RelNode left = hashJoinPrel.getLeft(); +RelNode right = hashJoinPrel.getRight(); +ExchangePrel exchangePrel = findRightExchangePrel(right); +if (exchangePrel == null) { + //Does not support the single fragment mode ,that is the right build side + //can only be BroadcastExchangePrel or HashToRandomExchangePrel + return null; +} List leftFields = left.getRowType().getFieldNames(); List leftKeys = hashJoinPrel.getLeftKeys(); RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery(); Review comment: For line 160 I wanted to understand why are we trying to get ScanPrel node for each left key ? Won't all the left keys be coming from same ScanPrel considering we are not handling blocking operator scenario ? Basically I am trying to understand when there can be 2 source ScanPrel for set of left keys to a HashJoinPrel. Also even if that is the case where we can have multiple source scan prel's then on line 188 we are only using last known ScanPrel node This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
sohami commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r226721691 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -160,7 +160,7 @@ private void setupHashHelper() { if (!runtimeFilterSink.containOne()) { return; } -if (runtimeFilterSink.hasFreshOne()) { +if (runtimeFilterSink.hasFreshOne() || current == null) { Review comment: why **current == null** check is needed here ? By default current is null only first time and hasFreshOne() will also return true in that case if a valid filter is available. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services