[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r210146731 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.filter; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates + * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered + * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect + * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op. + */ +public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch { + private SelectionVector2 sv2; + + private ValueVectorHashHelper.Hash64 hash64; + private Map field2id = new HashMap<>(); + private List toFilterFields; + private int originalRecordCount; + private int recordCount; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + + public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { +super(pop, context, incoming); + } + + @Override + public FragmentContext getContext() { +return context; + } + + @Override + public int getRecordCount() { +return sv2.getCount(); + } + + @Override + public SelectionVector2 getSelectionVector2() { +return sv2; + } + + @Override + public SelectionVector4 getSelectionVector4() { +return null; + } + + @Override + protected IterOutcome doWork() { +container.transferIn(incoming.getContainer()); +originalRecordCount = incoming.getRecordCount(); +sv2.setOriginalRecordCount(originalRecordCount); +try { + applyRuntimeFilter(); +} catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); +} +return getFinalOutcome(false); + } + + @Override + public void close() { +if (sv2 != null) { + sv2.clear(); +} +super.close(); + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { +if (sv2 != null) { + sv2.clear(); +} + +switch (incoming.getSchema().getSelectionVectorMode()) { + case NONE: +if (sv2 == null) { + sv2 = new SelectionVector2(oContext.getAllocator()); +} +break; + case TWO_BYTE: +sv2 = new SelectionVector2(oContext.getAllocator()); +break; + case FOUR_BYTE: + + default: +throw new UnsupportedOperationException(); +} + +if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.TWO_BYTE); + return true; +} +return false; + } + + /** + * + * @return
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r210146226 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.filter; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates + * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered + * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect + * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op. + */ +public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch { + private SelectionVector2 sv2; + + private ValueVectorHashHelper.Hash64 hash64; + private Map field2id = new HashMap<>(); + private List toFilterFields; + private int originalRecordCount; + private int recordCount; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + + public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { +super(pop, context, incoming); + } + + @Override + public FragmentContext getContext() { +return context; + } + + @Override + public int getRecordCount() { +return sv2.getCount(); + } + + @Override + public SelectionVector2 getSelectionVector2() { +return sv2; + } + + @Override + public SelectionVector4 getSelectionVector4() { +return null; + } + + @Override + protected IterOutcome doWork() { +container.transferIn(incoming.getContainer()); +originalRecordCount = incoming.getRecordCount(); +sv2.setOriginalRecordCount(originalRecordCount); +try { + applyRuntimeFilter(); +} catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); +} +return getFinalOutcome(false); + } + + @Override + public void close() { +if (sv2 != null) { + sv2.clear(); +} +super.close(); + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { +if (sv2 != null) { + sv2.clear(); +} + +switch (incoming.getSchema().getSelectionVectorMode()) { + case NONE: +if (sv2 == null) { + sv2 = new SelectionVector2(oContext.getAllocator()); +} +break; + case TWO_BYTE: +sv2 = new SelectionVector2(oContext.getAllocator()); +break; + case FOUR_BYTE: + + default: +throw new UnsupportedOperationException(); +} + +if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.TWO_BYTE); + return true; +} +return false; + } + + /** + * + * @return
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r209853440 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.filter; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates + * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered + * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect + * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op. + */ +public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch { + private SelectionVector2 sv2; + + private ValueVectorHashHelper.Hash64 hash64; + private Map field2id = new HashMap<>(); + private List toFilterFields; + private int originalRecordCount; + private int recordCount; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + + public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { +super(pop, context, incoming); + } + + @Override + public FragmentContext getContext() { +return context; + } + + @Override + public int getRecordCount() { +return sv2.getCount(); + } + + @Override + public SelectionVector2 getSelectionVector2() { +return sv2; + } + + @Override + public SelectionVector4 getSelectionVector4() { +return null; + } + + @Override + protected IterOutcome doWork() { +container.transferIn(incoming.getContainer()); +originalRecordCount = incoming.getRecordCount(); +sv2.setOriginalRecordCount(originalRecordCount); +try { + applyRuntimeFilter(); +} catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); +} +return getFinalOutcome(false); + } + + @Override + public void close() { +if (sv2 != null) { + sv2.clear(); +} +super.close(); + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { +if (sv2 != null) { + sv2.clear(); +} + +switch (incoming.getSchema().getSelectionVectorMode()) { + case NONE: +if (sv2 == null) { + sv2 = new SelectionVector2(oContext.getAllocator()); +} +break; + case TWO_BYTE: +sv2 = new SelectionVector2(oContext.getAllocator()); +break; + case FOUR_BYTE: + + default: +throw new UnsupportedOperationException(); +} + +if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.TWO_BYTE); + return true; +} +return false; + } + + /** + * + * @return
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r209851687 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.filter; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates + * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered + * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect + * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op. + */ +public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch { + private SelectionVector2 sv2; + + private ValueVectorHashHelper.Hash64 hash64; + private Map field2id = new HashMap<>(); + private List toFilterFields; + private int originalRecordCount; + private int recordCount; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + + public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { +super(pop, context, incoming); + } + + @Override + public FragmentContext getContext() { +return context; + } + + @Override + public int getRecordCount() { +return sv2.getCount(); + } + + @Override + public SelectionVector2 getSelectionVector2() { +return sv2; + } + + @Override + public SelectionVector4 getSelectionVector4() { +return null; + } + + @Override + protected IterOutcome doWork() { +container.transferIn(incoming.getContainer()); +originalRecordCount = incoming.getRecordCount(); +sv2.setOriginalRecordCount(originalRecordCount); +try { + applyRuntimeFilter(); +} catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); +} +return getFinalOutcome(false); + } + + @Override + public void close() { +if (sv2 != null) { + sv2.clear(); +} +super.close(); + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { +if (sv2 != null) { + sv2.clear(); +} + +switch (incoming.getSchema().getSelectionVectorMode()) { + case NONE: +if (sv2 == null) { + sv2 = new SelectionVector2(oContext.getAllocator()); +} +break; + case TWO_BYTE: +sv2 = new SelectionVector2(oContext.getAllocator()); +break; + case FOUR_BYTE: + + default: +throw new UnsupportedOperationException(); +} + +if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.TWO_BYTE); + return true; +} +return false; + } + + /** + * + * @return
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r209850478 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.RuntimeFilterPOP; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +import java.io.IOException; +import java.util.List; + +public class RuntimeFilterPrel extends SinglePrel{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class); + + public RuntimeFilterPrel(Prel child){ +super(child.getCluster(), child.getTraitSet(), child); + } + + public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { +super(cluster, traits, child); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { +return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0)); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { +RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator)); +return creator.addMetadata(this, r); + } + + @Override + public SelectionVectorMode getEncoding() { +return SelectionVectorMode.TWO_BYTE; + } + + @Override + public Prel prepareForLateralUnnestPipeline(List children) { +return (Prel) this.copy(this.traitSet, children); Review comment: Not understanding the logic here, the `RuntimeFilterPrel` has no planed time condition , do we need to be same as `FilterPrel` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208770741 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ## @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); startQueryProcessing(); +if (enableRuntimeFilter) { + runtimeFilterManager.waitForComplete(); Review comment: sorry for the wrong statement. will change it to the right position . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208770741 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ## @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); startQueryProcessing(); +if (enableRuntimeFilter) { + runtimeFilterManager.waitForComplete(); Review comment: sorry for the wrong state. will change it to the right position . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208770073 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashAggPrel; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.SortPrel; +import org.apache.drill.exec.planner.physical.StreamAggPrel; +import org.apache.drill.exec.planner.physical.TopNPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map joinMjId2AggregatedRF = new ConcurrentHashMap<>(); + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + +
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208168529 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashAggPrel; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.StreamAggPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by le
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208165293 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ## @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); startQueryProcessing(); +if (enableRuntimeFilter) { + runtimeFilterManager.waitForComplete(); Review comment: There is various cases that the network between the foreman and receivers break down. Also the norm case is the receiver finished earlier before receiving the bloom filter. So the security way is to use the `SendingAccountor` to release the allocated `ByteBuf`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r208161542 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashAggPrel; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.StreamAggPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by le
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r207868475 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", see http://algo2.iti.kit.edu/singler/publications/cacheefficientbloomfilters-wea2007.pdf";>this paper + * for details, the main theory is to construct tiny bucket bloom filters which benefit to + * the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(int numBytes, BufferAllocator bufferAllocator) { +int size = BloomFilter.adjustByteSize(numBytes); +this.byteBuf = bufferAllocator.buffer(size); +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) { +int numBytes = BloomFilter.optimalNumOfBytes(ndv, fpp); +int size = BloomFilter.adjustByteSize(numBytes); +this.byteBuf = bufferAllocator.buffer(size); +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE_IN_BYTES) { + numBytes = MINIMUM_BLOOM_SIZE_IN_BYTES; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +//8 odd numbers act as salt value to participate in the computation of the mask. +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + +Arrays.fill(mask, 0); + +for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >> 27; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; +} + } + + /** + * Add an element's hash value to this bloom filter. + * @param hash hash result of element. + */ + public void insert(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); +int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(initialStartIndex, tempBucket); +for (int i = 0; i < 8; i++) { + //every iterate batch,we set 32 bits + int bitsetIndex = i * 4; + tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); + tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); + tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); + tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); +} +byteBuf.setBytes(initialStartIndex, tempBucket); + } + + /** + * Determine whether an element is set or not. + * + * @param hash the hash value of element. + * @return false if the element is not set, true if the element is probably set. + */ + public boolean find(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); + +int startIndex = bucketIndex * BYTES
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r207867048 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -190,11 +213,21 @@ public IterOutcome next() { if (isNewSchema) { // Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema. // This could happen when data sources have a non-trivial schema with 0 row. - container.buildSchema(SelectionVectorMode.NONE); + if (firstRuntimeFiltered) { +container.buildSchema(SelectionVectorMode.TWO_BYTE); +runtimeFiltered = true; + } else { +container.buildSchema(SelectionVectorMode.NONE); + } Review comment: I will take into account this suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r207866392 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ## @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); startQueryProcessing(); +if (enableRuntimeFilter) { + runtimeFilterManager.waitForComplete(); Review comment: To make sure the `RuntimerFilterManager`'s sending out `ByteBuf`s to be safely cleared out no matter the network successed or failed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r207860831 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashAggPrel; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.StreamAggPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by le
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r207836945 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashAggPrel; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.StreamAggPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by le
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206498489 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: This description will be added at the `ScanBatch`'s `applyRuntimeFilter `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206498489 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: This description will be add at the `ScanBatch`'s `applyRuntimeFilter `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206055252 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206054896 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); Review comment: the DrillBuf's content will be changed by calling setBytes and getBytes which not affect the readerIndex and writerIndex. When the BloomFilter was created, the DrillBuf's size is sure. The writerIndex setting here in advance is also clear and sure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206017040 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -528,6 +590,27 @@ private void initializeBuild() { } + private void initializeRuntimeFilter() { +if (!enableRuntimeFilter) { + return; +} +if (runtimeFilterReporter != null) { + return; +} +runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); +RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); +if (runtimeFilterDef != null) { Review comment: `enableRuntimeFilter` is a global option to decide whether a HashJoin could generate RuntimeFilter. But it does not indicate the HashJoin must have a RuntimeFilterDef. There's some preconditions defined at the `RuntimeFilterManager` such as Left join is not allowed to use a BF to filter data. So we can't use `Preconditions.checkState(runtimeFilterDef != null)`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206015829 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: Btw, if there's too much multi-column joins , we can add an option to limit the generated BloomFilter numbers. That's to say there's no need to generate one BF one joint column. We can choose to abandon generating some BFs to save the CPU cost and memory usage. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206014664 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; Review comment: The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's content. And we always invoke the getBytes method before using the dst array. So there's no risk to use a polluted byte array. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206014664 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; Review comment: The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's content. And we always invoke the getBytes method before usage the dst array. So there's no risk to use a polluted byte array. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r206014055 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: There's no intersection of the two separate BFs. The intersection meaning is implemented indirectly at the probe side `ScanBatch`. The separated BFs filter the probe side columns independently and set a BitSet (which indicates the possible rows matched the join condition ) together. This part of codes is at the `ScanBatch`'s `applyRuntimeFilter` and `computeBitSet` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204973201 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param drillbitContext + */ + public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { +this.rootWrapper = workUnit.getRootWrapper(); +this.dril
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205965375 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: Rethink about this point , I think the current one join column one bloom filter strategy is right. I would prefer this strategy. To your example , let's assume t1.a1 is a low cardinality column (assume a sex column ), t1.b1 is a high cardinality column. If we take the multi-join column one bloom filter strategy, the required bloom filter's memory size should keep the same proportion with the joint two column cardinality size. So this is not acceptable. To the current implementation, the low cardinality joint column will only consume low memory size, the high cardinality column will consume another independent part of bloom filter memory. To the high cardinality column case ,maybe we can later add an option to decide whether to enable this bloom filter to push down. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205939854 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { + + public static BloomFilter spawnOne(int numBytes, BufferAllocator bufferAllocator) + { +int size = BloomFilter.adjustByteSize(numBytes); +DrillBuf drillBuf = bufferAllocator.buffer(size); +BloomFilter bloomFilter = new BloomFilter(drillBuf); +return bloomFilter; + } + + public static BloomFilter spawnOne(int ndv, double fpp, BufferAllocator bufferAllocator) + { +int numBytes = BloomFilter.optimalNumOfBits(ndv, fpp); Review comment: will rename the optimalNumOfBits method to optimalNumOfBytes by dividing the return value by 8. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205939823 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + +Arrays.fill(mask, 0); + +for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >> 27; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; +} + } + + /** + *Add an element's hash value to this bloom filter. + * @param hash hash result of element. + */ + public void insert(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); +int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(initialStartIndex, tempBucket); +for (int i = 0; i < 8; i++) { + //every iterate batch,we set 32 bits + int bitsetIndex = i * 4; + tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); + tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); + tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); + tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); +} +byteBuf.setBytes(initialStartIndex, tempBucket); + } + + /** + * Determine whether an element is set or not. + * + * @param hash the hash value of element. + * @return false if the element is not set, true if the element is probably set. + */ + public boolean find(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); + +int startIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(startIndex, tempBucket); +for (int i = 0; i < 8; i++) { + byte set = 0; + int bitsetIndex = i * 4; + set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); + set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); + set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); + set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); + if (0 == set) { +return false; + } +} +return true; + } + + /** + * Merge this bloom filter with other one + * @param other + */ + public void or(BloomFilter other) { +int otherLength = other.byteBuf.capacity(); +int thisLength = this.byteBuf.capacity(); +assert otherLength == thisLength; +//to avoid checking times of Byte
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938739 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; Review comment: The reason to keep it as a member variable is to avoid heap memory GC while invoking the corresponding methods frequently. If move it to the locally methods, it will break down the performance . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938434 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + +Arrays.fill(mask, 0); + +for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >> 27; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; +} + } + + /** + *Add an element's hash value to this bloom filter. + * @param hash hash result of element. + */ + public void insert(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); +int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(initialStartIndex, tempBucket); +for (int i = 0; i < 8; i++) { + //every iterate batch,we set 32 bits + int bitsetIndex = i * 4; + tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); + tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); + tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); + tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); +} +byteBuf.setBytes(initialStartIndex, tempBucket); + } + + /** + * Determine whether an element is set or not. + * + * @param hash the hash value of element. + * @return false if the element is not set, true if the element is probably set. + */ + public boolean find(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); + +int startIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(startIndex, tempBucket); +for (int i = 0; i < 8; i++) { + byte set = 0; + int bitsetIndex = i * 4; + set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); + set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); + set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); + set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); + if (0 == set) { +return false; + } +} +return true; + } + + /** + * Merge this bloom filter with other one + * @param other + */ + public void or(BloomFilter other) { +int otherLength = other.byteBuf.capacity(); +int thisLength = this.byteBuf.capacity(); +assert otherLength == thisLength; +//to avoid checking times of Byte
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938123 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; Review comment: BYTES is right. will update that naming. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938105 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; Review comment: those 8 odd numbers was picked from Impala's implementation(see bloom-filter.h) . They act as salt values to participate in calculating the mask. I will add some comments to this method to make it clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204974446 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { Review comment: Current implementation is one bloom filter one join column. To your example, multi-column join , will generate two bloom filters. The reason to this implementation is to achieve one vector column memory access by one hash computation. But the Murmur hash 's complex computation ate up the pipeline performance , the result performance does not so good. So I will change it to your assumes implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204973381 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param drillbitContext + */ + public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { +this.rootWrapper = workUnit.getRootWrapper(); +this.dril
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204973345 ## File path: exec/java-exec/src/main/resources/drill-module.conf ## @@ -455,6 +455,8 @@ drill.exec.options: { exec.hashjoin.num_partitions: 32, exec.hashjoin.num_rows_in_batch: 1024, exec.hashjoin.max_batches_in_memory: 0, +exec.hashjoin.enable.runtime_filter: true, Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204973201 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param drillbitContext + */ + public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { +this.rootWrapper = workUnit.getRootWrapper(); +this.dril
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r204965485 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param drillbitContext + */ + public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { +this.rootWrapper = workUnit.getRootWrapper(); +this.dril
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200521164 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import com.google.common.collect.Sets; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private QueryContext queryContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param queryContext + */ + public Ru
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200520720 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import com.google.common.collect.Sets; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * for which is JPPD (join predicate push down) is possible. The prerequisite to do JPPD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocking operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map> joinMjId2probdeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's number + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private QueryContext queryContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * This class maintains context for the runtime join push down's filter management. It + * does a traversal of the physical operators by leveraging the root wrapper which indirectly + * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * @param workUnit + * @param queryContext + */ + public Ru
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200520207 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ## @@ -375,6 +378,30 @@ protected void cleanup() { public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { return runningFragments.get(handle); } + +public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { + BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); + boolean toForeman = runtimeFilterDef.getToForeman(); + QueryId queryId = runtimeFilterDef.getQueryId(); + //to foreman + if (toForeman) { +Foreman foreman = queries.get(queryId); +if (foreman != null) { Review comment: Yes, it's supposed to handle the cases: the foreman has crashed or the whole query has completed ahead of time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200519816 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java ## @@ -76,7 +77,7 @@ public boolean inScopeOf(OptionScope scope) { } public enum Kind { -BOOLEAN, LONG, STRING, DOUBLE +BOOLEAN, LONG, STRING, DOUBLE, INTEGER Review comment: The reason to add an INTEGER option is that the largest BF's byte size is supposed to be Integer.MAX_VALUE which is limited by DrillBuf's size. So INTEGER option would be precisely. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200519330 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ## @@ -151,12 +151,12 @@ public RelOptCost computeSelfCost(final RelOptPlanner planner, RelMetadataQuery @Override public SelectionVectorMode[] getSupportedEncodings() { -return SelectionVectorMode.DEFAULT; +return SelectionVectorMode.NONE_AND_TWO; } @Override public SelectionVectorMode getEncoding() { -return SelectionVectorMode.NONE; +return SelectionVectorMode.TWO_BYTE; Review comment: What do you mean by `NONE_AND_TWO ` ,since the return is `SelectionVectorMode` not an array. If JPPD is not applicable, to `ScanBatch` ,the output container's build schema would always be `SelectionVectorMode.NONE` , and will not generate SV2. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r200518460 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ## @@ -151,12 +151,12 @@ public RelOptCost computeSelfCost(final RelOptPlanner planner, RelMetadataQuery @Override public SelectionVectorMode[] getSupportedEncodings() { -return SelectionVectorMode.DEFAULT; +return SelectionVectorMode.NONE_AND_TWO; Review comment: Yes, I will update that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199395144 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -696,6 +780,18 @@ public void executeBuildPhase() throws SchemaChangeException { if ( cycleNum > 0 ) { read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); } +//create runtime filter +if (cycleNum == 0 && enableRuntimeFilter) { + //create runtime filter and send out async + int condFieldIndex = 0; + for (BloomFilter bloomFilter : bloomFilters) { +for (int ind = 0; ind < currentRecordCount; ind++) { + long hashCode = hash64.hash64Code(ind, 0, condFieldIndex); + bloomFilter.insert(hashCode); Review comment: I will enhance this at another JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199374038 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ## @@ -32,35 +33,53 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.exec.physical.base.AbstractJoinPop; +import org.apache.drill.exec.work.filter.RuntimeFilterDef; + @JsonTypeName("hash-join") +@JsonIgnoreProperties(ignoreUnknown = true) Review comment: The primary reason is to backward compatible. PhysicalPlans without `runtimeFilterDef` property (maybe not support RuntimeFilter) can still work out. That is the `ignoreUnknown=true` usage. The `@JsonIgnore` is to not output the property content as the final json. That's not our target. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199369589 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -226,6 +244,96 @@ public IterOutcome next() { } } + private void applyRuntimeFilter() throws SchemaChangeException { +RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); +if (runtimeFilterWritable == null) { + return; +} +if (recordCount <= 0) { + return; +} +List bloomFilters = runtimeFilterWritable.unwrap(); +if (hash64 == null) { + ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, context); + try { +//generate hash helper +this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); +List hashFieldExps = new ArrayList<>(); +List typedFieldIds = new ArrayList<>(); +for (String toFilterField : toFilterFields) { + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); + this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + typedFieldIds.add(typedFieldId); + ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); + hashFieldExps.add(toHashFieldExp); +} +hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()])); + } catch (Exception e) { +throw UserException.internalError(e).build(logger); + } +} +selectionVector2.allocateNew(recordCount); +BitSet bitSet = new BitSet(recordCount); +for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); +} +int svIndex = 0; +int tmpFilterRows = 0; +for (int i = 0; i < recordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { +selectionVector2.setIndex(svIndex, i); +svIndex++; + } else { +tmpFilterRows++; + } +} +selectionVector2.setRecordCount(svIndex); +if (tmpFilterRows > 0 && tmpFilterRows == recordCount) { + recordCount = 0; + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} +if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) { + totalFilterRows = totalFilterRows + tmpFilterRows; + recordCount = svIndex; + BatchSchema batchSchema = this.schema; + VectorContainer backUpContainer = new VectorContainer(this.oContext.getAllocator(), batchSchema); + int fieldCount = batchSchema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { +ValueVector from = this.getContainer().getValueVector(i).getValueVector(); +ValueVector to = backUpContainer.getValueVector(i).getValueVector(); +to.setInitialCapacity(svIndex); +for (int r = 0; r < svIndex; r++) { + to.copyEntry(r, from, selectionVector2.getIndex(r)); Review comment: @aman thanks for other valuable reviews ,will soon update that. To this point, I initially tend to output the SV2. But to fellow reasons, I give up: * Not all the possible operators support the SelectionModel. If users’ rule pushed down the filter conditions,the output SV2 maybe not processed by the not supported operators(i.e. it’s not definitive to have a operator which supports the SelectionModel above the Scan node). * The BatchSchema’s SelectionModel also becomes a runtime var.This will also affect the upper filter node’s code-gen logic to dynamically generate fresh filter codes to the Scan’s SV2. I agree that the memory copy cost will be less if the above filter node can filter more rows over the Scan’s SV2.So what your opinion about this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199113139 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ## @@ -32,35 +33,53 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.exec.physical.base.AbstractJoinPop; +import org.apache.drill.exec.work.filter.RuntimeFilterDef; + @JsonTypeName("hash-join") +@JsonIgnoreProperties(ignoreUnknown = true) public class HashJoinPOP extends AbstractJoinPop { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class); + private RuntimeFilterDef runtimeFilterDef; Review comment: RuntimeFilterDef was created after the physical plan was generated. Constructing the BloomFilter at HashJoin node is a heavy work, to a light weight join query (i.e. two small tables), it's not applicable to construct a bloom filter.So I left some TODO work at the RuntimeFilterManager.quelifiedHolders method to value whether applicable to apply the RuntimeFilter according to the NDV from both sides. The NDV statistics is not supported now. I will fire another work about the Metadata to support it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services