weijietong commented on a change in pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf… URL: https://github.com/apache/drill/pull/1504#discussion_r229929162
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ########## @@ -125,33 +128,53 @@ public void waitForComplete() { /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { - broadcastAggregatedRuntimeFilter(runtimeFilterWritable); + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + int buildSideRfNumber; + RuntimeFilterWritable toAggregated; + synchronized (this) { + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { + toAggregated = srcRuntimeFilterWritable; + toAggregated.retainBuffers(1); Review comment: The received RF will be held till a fixed number of RFs aggregating to send out.If we don’t retain the first one,the caller will release it at the final block, and the held to be used later RF ‘s reference number will be zero. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services