sohami commented on a change in pull request #1504: DRILL-6792: Find the right
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r229908076
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
##########
@@ -125,33 +128,53 @@ public void waitForComplete() {
/**
* This method is passively invoked by receiving a runtime filter from the
network
- * @param runtimeFilterWritable
+ *
+ * @param srcRuntimeFilterWritable
*/
- public void registerRuntimeFilter(RuntimeFilterWritable
runtimeFilterWritable) {
- broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
+ public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+ BitData.RuntimeFilterBDef runtimeFilterB =
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ int buildSideRfNumber;
+ RuntimeFilterWritable toAggregated;
+ synchronized (this) {
+ buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+ buildSideRfNumber--;
+ joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+ toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+ if (toAggregated == null) {
+ toAggregated = srcRuntimeFilterWritable;
+ toAggregated.retainBuffers(1);
+ } else {
+ toAggregated.aggregate(srcRuntimeFilterWritable);
+ }
+ joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+ }
+ if (buildSideRfNumber == 0) {
+ joinMjId2AggregatedRF.remove(joinMajorId);
+ route(toAggregated);
+ }
}
-
- private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable
srcRuntimeFilterWritable) {
+ private void route(RuntimeFilterWritable srcRuntimeFilterWritable) {
BitData.RuntimeFilterBDef runtimeFilterB =
srcRuntimeFilterWritable.getRuntimeFilterBDef();
int joinMajorId = runtimeFilterB.getMajorFragmentId();
UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+ List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
DrillBuf[] data = srcRuntimeFilterWritable.getData();
- List<CoordinationProtos.DrillbitEndpoint> scanNodeEps =
joinMjId2probdeScanEps.get(joinMajorId);
+ List<CoordinationProtos.DrillbitEndpoint> scanNodeEps =
joinMjId2probeScanEps.get(joinMajorId);
+ int scanNodeSize = scanNodeEps.size();
+ srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
BitData.RuntimeFilterBDef.Builder builder =
BitData.RuntimeFilterBDef.newBuilder();
for (String probeField : probeFields) {
builder.addProbeFields(probeField);
}
- BitData.RuntimeFilterBDef runtimeFilterBDef = builder
- .setQueryId(queryId)
- .setMajorFragmentId(scanNodeMjId)
- .setMinorFragmentId(minorId)
- .build();
+ BitData.RuntimeFilterBDef runtimeFilterBDef =
builder.setQueryId(queryId).setMajorFragmentId(scanNodeMjId).setMinorFragmentId(minorId).setToForeman(false).addAllBloomFilterSizeInBytes(sizeInBytes).build();
Review comment:
please fix the indentation here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services