[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-14 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r210146731
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch 
participates
+ * in the HashJoinBatch and can be applied by a RuntimeFilter, it will 
generate a filtered
+ * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 
which will not affect
+ * the Query's performance ,but just do a memory transfer by the later 
RemovingRecordBatch op.
+ */
+public class RuntimeFilterRecordBatch extends 
AbstractSingleRecordBatch {
+  private SelectionVector2 sv2;
+
+  private ValueVectorHashHelper.Hash64 hash64;
+  private Map field2id = new HashMap<>();
+  private List toFilterFields;
+  private int originalRecordCount;
+  private int recordCount;
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+
+  public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+super(pop, context, incoming);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+return context;
+  }
+
+  @Override
+  public int getRecordCount() {
+return sv2.getCount();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+return null;
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+container.transferIn(incoming.getContainer());
+originalRecordCount = incoming.getRecordCount();
+sv2.setOriginalRecordCount(originalRecordCount);
+try {
+  applyRuntimeFilter();
+} catch (SchemaChangeException e) {
+  throw new UnsupportedOperationException(e);
+}
+return getFinalOutcome(false);
+  }
+
+  @Override
+  public void close() {
+if (sv2 != null) {
+  sv2.clear();
+}
+super.close();
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+if (sv2 != null) {
+  sv2.clear();
+}
+
+switch (incoming.getSchema().getSelectionVectorMode()) {
+  case NONE:
+if (sv2 == null) {
+  sv2 = new SelectionVector2(oContext.getAllocator());
+}
+break;
+  case TWO_BYTE:
+sv2 = new SelectionVector2(oContext.getAllocator());
+break;
+  case FOUR_BYTE:
+
+  default:
+throw new UnsupportedOperationException();
+}
+
+if (container.isSchemaChanged()) {
+  container.buildSchema(SelectionVectorMode.TWO_BYTE);
+  return true;
+}
+return false;
+  }
+
+  /**
+   *
+   * @return

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-14 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r210146226
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch 
participates
+ * in the HashJoinBatch and can be applied by a RuntimeFilter, it will 
generate a filtered
+ * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 
which will not affect
+ * the Query's performance ,but just do a memory transfer by the later 
RemovingRecordBatch op.
+ */
+public class RuntimeFilterRecordBatch extends 
AbstractSingleRecordBatch {
+  private SelectionVector2 sv2;
+
+  private ValueVectorHashHelper.Hash64 hash64;
+  private Map field2id = new HashMap<>();
+  private List toFilterFields;
+  private int originalRecordCount;
+  private int recordCount;
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+
+  public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+super(pop, context, incoming);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+return context;
+  }
+
+  @Override
+  public int getRecordCount() {
+return sv2.getCount();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+return null;
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+container.transferIn(incoming.getContainer());
+originalRecordCount = incoming.getRecordCount();
+sv2.setOriginalRecordCount(originalRecordCount);
+try {
+  applyRuntimeFilter();
+} catch (SchemaChangeException e) {
+  throw new UnsupportedOperationException(e);
+}
+return getFinalOutcome(false);
+  }
+
+  @Override
+  public void close() {
+if (sv2 != null) {
+  sv2.clear();
+}
+super.close();
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+if (sv2 != null) {
+  sv2.clear();
+}
+
+switch (incoming.getSchema().getSelectionVectorMode()) {
+  case NONE:
+if (sv2 == null) {
+  sv2 = new SelectionVector2(oContext.getAllocator());
+}
+break;
+  case TWO_BYTE:
+sv2 = new SelectionVector2(oContext.getAllocator());
+break;
+  case FOUR_BYTE:
+
+  default:
+throw new UnsupportedOperationException();
+}
+
+if (container.isSchemaChanged()) {
+  container.buildSchema(SelectionVectorMode.TWO_BYTE);
+  return true;
+}
+return false;
+  }
+
+  /**
+   *
+   * @return

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-14 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r209853440
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch 
participates
+ * in the HashJoinBatch and can be applied by a RuntimeFilter, it will 
generate a filtered
+ * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 
which will not affect
+ * the Query's performance ,but just do a memory transfer by the later 
RemovingRecordBatch op.
+ */
+public class RuntimeFilterRecordBatch extends 
AbstractSingleRecordBatch {
+  private SelectionVector2 sv2;
+
+  private ValueVectorHashHelper.Hash64 hash64;
+  private Map field2id = new HashMap<>();
+  private List toFilterFields;
+  private int originalRecordCount;
+  private int recordCount;
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+
+  public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+super(pop, context, incoming);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+return context;
+  }
+
+  @Override
+  public int getRecordCount() {
+return sv2.getCount();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+return null;
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+container.transferIn(incoming.getContainer());
+originalRecordCount = incoming.getRecordCount();
+sv2.setOriginalRecordCount(originalRecordCount);
+try {
+  applyRuntimeFilter();
+} catch (SchemaChangeException e) {
+  throw new UnsupportedOperationException(e);
+}
+return getFinalOutcome(false);
+  }
+
+  @Override
+  public void close() {
+if (sv2 != null) {
+  sv2.clear();
+}
+super.close();
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+if (sv2 != null) {
+  sv2.clear();
+}
+
+switch (incoming.getSchema().getSelectionVectorMode()) {
+  case NONE:
+if (sv2 == null) {
+  sv2 = new SelectionVector2(oContext.getAllocator());
+}
+break;
+  case TWO_BYTE:
+sv2 = new SelectionVector2(oContext.getAllocator());
+break;
+  case FOUR_BYTE:
+
+  default:
+throw new UnsupportedOperationException();
+}
+
+if (container.isSchemaChanged()) {
+  container.buildSchema(SelectionVectorMode.TWO_BYTE);
+  return true;
+}
+return false;
+  }
+
+  /**
+   *
+   * @return

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-14 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r209851687
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch 
participates
+ * in the HashJoinBatch and can be applied by a RuntimeFilter, it will 
generate a filtered
+ * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 
which will not affect
+ * the Query's performance ,but just do a memory transfer by the later 
RemovingRecordBatch op.
+ */
+public class RuntimeFilterRecordBatch extends 
AbstractSingleRecordBatch {
+  private SelectionVector2 sv2;
+
+  private ValueVectorHashHelper.Hash64 hash64;
+  private Map field2id = new HashMap<>();
+  private List toFilterFields;
+  private int originalRecordCount;
+  private int recordCount;
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+
+  public RuntimeFilterRecordBatch(Filter pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+super(pop, context, incoming);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+return context;
+  }
+
+  @Override
+  public int getRecordCount() {
+return sv2.getCount();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+return sv2;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+return null;
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+container.transferIn(incoming.getContainer());
+originalRecordCount = incoming.getRecordCount();
+sv2.setOriginalRecordCount(originalRecordCount);
+try {
+  applyRuntimeFilter();
+} catch (SchemaChangeException e) {
+  throw new UnsupportedOperationException(e);
+}
+return getFinalOutcome(false);
+  }
+
+  @Override
+  public void close() {
+if (sv2 != null) {
+  sv2.clear();
+}
+super.close();
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+if (sv2 != null) {
+  sv2.clear();
+}
+
+switch (incoming.getSchema().getSelectionVectorMode()) {
+  case NONE:
+if (sv2 == null) {
+  sv2 = new SelectionVector2(oContext.getAllocator());
+}
+break;
+  case TWO_BYTE:
+sv2 = new SelectionVector2(oContext.getAllocator());
+break;
+  case FOUR_BYTE:
+
+  default:
+throw new UnsupportedOperationException();
+}
+
+if (container.isSchemaChanged()) {
+  container.buildSchema(SelectionVectorMode.TWO_BYTE);
+  return true;
+}
+return false;
+  }
+
+  /**
+   *
+   * @return

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-14 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r209850478
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RuntimeFilterPrel extends SinglePrel{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class);
+
+  public RuntimeFilterPrel(Prel child){
+super(child.getCluster(), child.getTraitSet(), child);
+  }
+
+  public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child) {
+super(cluster, traits, child);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List inputs) {
+return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0));
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+RuntimeFilterPOP r =  new RuntimeFilterPOP( 
((Prel)getInput()).getPhysicalOperator(creator));
+return creator.addMetadata(this, r);
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+return SelectionVectorMode.TWO_BYTE;
+  }
+
+  @Override
+  public Prel prepareForLateralUnnestPipeline(List children) {
+return (Prel) this.copy(this.traitSet, children);
 
 Review comment:
   Not understanding the logic here, the `RuntimeFilterPrel` has no planed time 
condition  , do we need to be same as `FilterPrel` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-08 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208770741
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
 ##
 @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) 
throws ExecutionSetupExcep
 fragmentsRunner.setFragmentsInfo(work.getFragments(), 
work.getRootFragment(), work.getRootOperator());
 
 startQueryProcessing();
+if (enableRuntimeFilter) {
+  runtimeFilterManager.waitForComplete();
 
 Review comment:
   sorry for the wrong statement. will change it to the right position .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-08 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208770741
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
 ##
 @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) 
throws ExecutionSetupExcep
 fragmentsRunner.setFragmentsInfo(work.getFragments(), 
work.getRootFragment(), work.getRootOperator());
 
 startQueryProcessing();
+if (enableRuntimeFilter) {
+  runtimeFilterManager.waitForComplete();
 
 Review comment:
   sorry for the wrong state. will change it to the right position .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-08 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208770073
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashAggPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.SortPrel;
+import org.apache.drill.exec.planner.physical.StreamAggPrel;
+import org.apache.drill.exec.planner.physical.TopNPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map joinMjId2AggregatedRF = new 
ConcurrentHashMap<>();
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+ 

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-07 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208168529
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashAggPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.StreamAggPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by le

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-07 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208165293
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
 ##
 @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) 
throws ExecutionSetupExcep
 fragmentsRunner.setFragmentsInfo(work.getFragments(), 
work.getRootFragment(), work.getRootOperator());
 
 startQueryProcessing();
+if (enableRuntimeFilter) {
+  runtimeFilterManager.waitForComplete();
 
 Review comment:
   There is various cases that the network between the foreman and receivers 
break down. Also the norm case is the receiver finished earlier before 
receiving the bloom filter. So the security way is to use the 
`SendingAccountor` to release the allocated `ByteBuf`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-07 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r208161542
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashAggPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.StreamAggPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by le

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-06 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r207868475
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", see http://algo2.iti.kit.edu/singler/publications/cacheefficientbloomfilters-wea2007.pdf";>this
 paper
+ * for details, the main theory is to construct tiny bucket bloom filters 
which benefit to
+ * the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES = 16 * 
1024 * 1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(int numBytes, BufferAllocator bufferAllocator) {
+int size = BloomFilter.adjustByteSize(numBytes);
+this.byteBuf = bufferAllocator.buffer(size);
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+  public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) {
+int numBytes = BloomFilter.optimalNumOfBytes(ndv, fpp);
+int size = BloomFilter.adjustByteSize(numBytes);
+this.byteBuf = bufferAllocator.buffer(size);
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+if (numBytes < MINIMUM_BLOOM_SIZE_IN_BYTES) {
+  numBytes = MINIMUM_BLOOM_SIZE_IN_BYTES;
+}
+
+if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES) {
+  numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE_IN_BYTES;
+}
+
+// 32 bytes alignment, one bucket.
+numBytes = (numBytes + 0x1F) & (~0x1F);
+return numBytes;
+  }
+
+  private void setMask(int key) {
+//8 odd numbers act as salt value to participate in the computation of the 
mask.
+final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+Arrays.fill(mask, 0);
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = key * SALT[i];
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = mask[i] >> 27;
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = 0x1 << mask[i];
+}
+  }
+
+  /**
+   * Add an element's hash value to this bloom filter.
+   * @param hash hash result of element.
+   */
+  public void insert(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
+byteBuf.getBytes(initialStartIndex, tempBucket);
+for (int i = 0; i < 8; i++) {
+  //every iterate batch,we set 32 bits
+  int bitsetIndex = i * 4;
+  tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) 
(mask[i] >>> 24));
+  tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | 
(byte) (mask[i] >>> 16));
+  tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | 
(byte) (mask[i] >>> 8));
+  tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | 
(byte) (mask[i]));
+}
+byteBuf.setBytes(initialStartIndex, tempBucket);
+  }
+
+  /**
+   * Determine whether an element is set or not.
+   *
+   * @param hash the hash value of element.
+   * @return false if the element is not set, true if the element is probably 
set.
+   */
+  public boolean find(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+
+int startIndex = bucketIndex * BYTES

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-06 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r207867048
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -190,11 +213,21 @@ public IterOutcome next() {
 if (isNewSchema) {
   // Even when recordCount = 0, we should return return OK_NEW_SCHEMA 
if current reader presents a new schema.
   // This could happen when data sources have a non-trivial schema 
with 0 row.
-  container.buildSchema(SelectionVectorMode.NONE);
+  if (firstRuntimeFiltered) {
+container.buildSchema(SelectionVectorMode.TWO_BYTE);
+runtimeFiltered = true;
+  } else {
+container.buildSchema(SelectionVectorMode.NONE);
+  }
 
 Review comment:
   I will take into account this suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-06 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r207866392
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
 ##
 @@ -408,6 +424,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) 
throws ExecutionSetupExcep
 fragmentsRunner.setFragmentsInfo(work.getFragments(), 
work.getRootFragment(), work.getRootOperator());
 
 startQueryProcessing();
+if (enableRuntimeFilter) {
+  runtimeFilterManager.waitForComplete();
 
 Review comment:
   To make sure the `RuntimerFilterManager`'s sending out  `ByteBuf`s to be 
safely cleared out no matter the network successed or failed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-06 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r207860831
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashAggPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.StreamAggPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by le

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-08-06 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r207836945
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashAggPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.StreamAggPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by le

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-31 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206498489
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   This description will be added at the `ScanBatch`'s `applyRuntimeFilter `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-31 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206498489
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   This description will be add at the `ScanBatch`'s `applyRuntimeFilter `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-30 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206055252
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-30 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206054896
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
 
 Review comment:
   the DrillBuf's content will be changed by calling setBytes and getBytes 
which not affect the readerIndex and writerIndex. When the BloomFilter was 
created, the DrillBuf's size is sure. The writerIndex setting here in advance 
is also clear and sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206017040
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -528,6 +590,27 @@ private void initializeBuild() {
 
   }
 
+  private void initializeRuntimeFilter() {
+if (!enableRuntimeFilter) {
+  return;
+}
+if (runtimeFilterReporter != null) {
+  return;
+}
+runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+if (runtimeFilterDef != null) {
 
 Review comment:
   `enableRuntimeFilter` is a global option to decide whether a HashJoin could 
generate RuntimeFilter. But it does not indicate the HashJoin must have a 
RuntimeFilterDef. There's some preconditions defined at the 
`RuntimeFilterManager` such as Left join is not allowed to use a BF to filter 
data. So we can't use `Preconditions.checkState(runtimeFilterDef != null)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206015829
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Btw, if there's too much multi-column joins , we can add an option to limit 
the generated  BloomFilter numbers. That's to say there's no need to generate 
one BF one joint column. We can choose to abandon generating some BFs to save 
the CPU cost and memory usage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014664
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's 
content. And we always invoke the getBytes method before using the dst array. 
So there's no risk to use a polluted byte array.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014664
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's 
content. And we always invoke the getBytes method before usage the dst array. 
So there's no risk to use a polluted byte array.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014055
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   There's no intersection of the two separate BFs. The intersection meaning is 
implemented indirectly at the probe side `ScanBatch`. The separated BFs filter 
the probe side columns independently and set a BitSet (which indicates the 
possible rows matched the join condition ) together. This part of codes is at 
the `ScanBatch`'s `applyRuntimeFilter` and `computeBitSet` methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204973201
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param drillbitContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+this.dril

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205965375
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Rethink about this point , I think the current one join column one bloom 
filter strategy is right. I would prefer this strategy. To your example , let's 
assume t1.a1 is a low cardinality column (assume a sex column ), t1.b1 is a 
high cardinality column. If we take the multi-join column one bloom filter 
strategy, the required bloom filter's memory size should keep the same 
proportion with the joint two column cardinality size. So this is not 
acceptable. To the current implementation, the low cardinality joint column 
will only consume low memory size, the high cardinality column will consume 
another independent part of bloom filter memory. To the high cardinality column 
case ,maybe we can later add an option to decide whether to enable this bloom 
filter to push down.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205939854
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
+
+  public static BloomFilter spawnOne(int numBytes, BufferAllocator 
bufferAllocator)
+  {
+int size = BloomFilter.adjustByteSize(numBytes);
+DrillBuf drillBuf = bufferAllocator.buffer(size);
+BloomFilter bloomFilter = new BloomFilter(drillBuf);
+return bloomFilter;
+  }
+
+  public static BloomFilter spawnOne(int ndv, double fpp, BufferAllocator 
bufferAllocator)
+  {
+int numBytes = BloomFilter.optimalNumOfBits(ndv, fpp);
 
 Review comment:
   will rename the optimalNumOfBits method to optimalNumOfBytes by dividing the 
return value by 8.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205939823
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+if (numBytes < MINIMUM_BLOOM_SIZE) {
+  numBytes = MINIMUM_BLOOM_SIZE;
+}
+
+if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) {
+  numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE;
+}
+
+// 32 bytes alignment, one bucket.
+numBytes = (numBytes + 0x1F) & (~0x1F);
+return numBytes;
+  }
+
+  private void setMask(int key) {
+final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+Arrays.fill(mask, 0);
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = key * SALT[i];
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = mask[i] >> 27;
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = 0x1 << mask[i];
+}
+  }
+
+  /**
+   *Add an element's hash value to this bloom filter.
+   * @param hash hash result of element.
+   */
+  public void insert(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
+byteBuf.getBytes(initialStartIndex, tempBucket);
+for (int i = 0; i < 8; i++) {
+  //every iterate batch,we set 32 bits
+  int bitsetIndex = i * 4;
+  tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) 
(mask[i] >>> 24));
+  tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | 
(byte) (mask[i] >>> 16));
+  tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | 
(byte) (mask[i] >>> 8));
+  tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | 
(byte) (mask[i]));
+}
+byteBuf.setBytes(initialStartIndex, tempBucket);
+  }
+
+  /**
+   * Determine whether an element is set or not.
+   *
+   * @param hash the hash value of element.
+   * @return false if the element is not set, true if the element is probably 
set.
+   */
+  public boolean find(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+
+int startIndex = bucketIndex * BYTES_PER_BUCKET;
+byteBuf.getBytes(startIndex, tempBucket);
+for (int i = 0; i < 8; i++) {
+  byte set = 0;
+  int bitsetIndex = i * 4;
+  set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
+  set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
+  set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
+  set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
+  if (0 == set) {
+return false;
+  }
+}
+return true;
+  }
+
+  /**
+   * Merge this bloom filter with other one
+   * @param other
+   */
+  public void or(BloomFilter other) {
+int otherLength = other.byteBuf.capacity();
+int thisLength = this.byteBuf.capacity();
+assert otherLength == thisLength;
+//to avoid checking times of Byte

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205938739
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   The reason to keep it as a member variable is to avoid heap memory GC while 
invoking the corresponding methods frequently. If move it to the locally 
methods, it will break down the performance . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205938434
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+if (numBytes < MINIMUM_BLOOM_SIZE) {
+  numBytes = MINIMUM_BLOOM_SIZE;
+}
+
+if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) {
+  numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE;
+}
+
+// 32 bytes alignment, one bucket.
+numBytes = (numBytes + 0x1F) & (~0x1F);
+return numBytes;
+  }
+
+  private void setMask(int key) {
+final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+Arrays.fill(mask, 0);
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = key * SALT[i];
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = mask[i] >> 27;
+}
+
+for (int i = 0; i < 8; ++i) {
+  mask[i] = 0x1 << mask[i];
+}
+  }
+
+  /**
+   *Add an element's hash value to this bloom filter.
+   * @param hash hash result of element.
+   */
+  public void insert(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
+byteBuf.getBytes(initialStartIndex, tempBucket);
+for (int i = 0; i < 8; i++) {
+  //every iterate batch,we set 32 bits
+  int bitsetIndex = i * 4;
+  tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) 
(mask[i] >>> 24));
+  tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | 
(byte) (mask[i] >>> 16));
+  tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | 
(byte) (mask[i] >>> 8));
+  tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | 
(byte) (mask[i]));
+}
+byteBuf.setBytes(initialStartIndex, tempBucket);
+  }
+
+  /**
+   * Determine whether an element is set or not.
+   *
+   * @param hash the hash value of element.
+   * @return false if the element is not set, true if the element is probably 
set.
+   */
+  public boolean find(long hash) {
+int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+int key = (int) hash;
+setMask(key);
+
+int startIndex = bucketIndex * BYTES_PER_BUCKET;
+byteBuf.getBytes(startIndex, tempBucket);
+for (int i = 0; i < 8; i++) {
+  byte set = 0;
+  int bitsetIndex = i * 4;
+  set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
+  set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
+  set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
+  set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
+  if (0 == set) {
+return false;
+  }
+}
+return true;
+  }
+
+  /**
+   * Merge this bloom filter with other one
+   * @param other
+   */
+  public void or(BloomFilter other) {
+int otherLength = other.byteBuf.capacity();
+int thisLength = this.byteBuf.capacity();
+assert otherLength == thisLength;
+//to avoid checking times of Byte

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205938123
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
 
 Review comment:
   BYTES is right. will update that naming.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-28 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205938105
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+if (numBytes < MINIMUM_BLOOM_SIZE) {
+  numBytes = MINIMUM_BLOOM_SIZE;
+}
+
+if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) {
+  numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE;
+}
+
+// 32 bytes alignment, one bucket.
+numBytes = (numBytes + 0x1F) & (~0x1F);
+return numBytes;
+  }
+
+  private void setMask(int key) {
+final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
 
 Review comment:
   those 8 odd numbers was picked from Impala's implementation(see 
bloom-filter.h) . They act as salt values to participate in calculating the 
mask. I will add some comments to this method to make it clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-24 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204974446
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Current implementation is one bloom filter one join column.  To your 
example, multi-column join , will generate two bloom filters.  The reason to 
this implementation is to achieve one vector column memory access by one  hash 
computation. But the Murmur hash 's complex computation ate up the pipeline 
performance , the result performance does not so good. So I will change it to 
your assumes implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-24 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204973381
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param drillbitContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+this.dril

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-24 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204973345
 
 

 ##
 File path: exec/java-exec/src/main/resources/drill-module.conf
 ##
 @@ -455,6 +455,8 @@ drill.exec.options: {
 exec.hashjoin.num_partitions: 32,
 exec.hashjoin.num_rows_in_batch: 1024,
 exec.hashjoin.max_batches_in_memory: 0,
+exec.hashjoin.enable.runtime_filter: true,
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-24 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204973201
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param drillbitContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+this.dril

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-24 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204965485
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param drillbitContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+this.dril

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200521164
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import com.google.common.collect.Sets;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private QueryContext queryContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param queryContext
+   */
+  public Ru

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200520720
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import com.google.common.collect.Sets;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private QueryContext queryContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param queryContext
+   */
+  public Ru

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200520207
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
 ##
 @@ -375,6 +378,30 @@ protected void cleanup() {
 public FragmentExecutor getFragmentRunner(final FragmentHandle handle) {
   return runningFragments.get(handle);
 }
+
+public void receiveRuntimeFilter(final RuntimeFilterWritable 
runtimeFilter) {
+  BitData.RuntimeFilterBDef runtimeFilterDef = 
runtimeFilter.getRuntimeFilterBDef();
+  boolean toForeman = runtimeFilterDef.getToForeman();
+  QueryId queryId = runtimeFilterDef.getQueryId();
+  //to foreman
+  if (toForeman) {
+Foreman foreman = queries.get(queryId);
+if (foreman != null) {
 
 Review comment:
   Yes, it's supposed to handle the cases: the foreman has crashed or the whole 
query has completed ahead of time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200519816
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
 ##
 @@ -76,7 +77,7 @@ public boolean inScopeOf(OptionScope scope) {
   }
 
   public enum Kind {
-BOOLEAN, LONG, STRING, DOUBLE
+BOOLEAN, LONG, STRING, DOUBLE, INTEGER
 
 Review comment:
   The reason to add an INTEGER option is that the largest BF's byte size is 
supposed to be Integer.MAX_VALUE which is limited by DrillBuf's size.  So 
INTEGER option would be precisely. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200519330
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
 ##
 @@ -151,12 +151,12 @@ public RelOptCost computeSelfCost(final RelOptPlanner 
planner, RelMetadataQuery
 
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
-return SelectionVectorMode.DEFAULT;
+return SelectionVectorMode.NONE_AND_TWO;
   }
 
   @Override
   public SelectionVectorMode getEncoding() {
-return SelectionVectorMode.NONE;
+return SelectionVectorMode.TWO_BYTE;
 
 Review comment:
   What do you mean by `NONE_AND_TWO ` ,since the return is 
`SelectionVectorMode` not an array. If JPPD is not applicable, to `ScanBatch` 
,the output container's build schema would always be `SelectionVectorMode.NONE` 
, and will not generate SV2. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-05 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r200518460
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
 ##
 @@ -151,12 +151,12 @@ public RelOptCost computeSelfCost(final RelOptPlanner 
planner, RelMetadataQuery
 
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
-return SelectionVectorMode.DEFAULT;
+return SelectionVectorMode.NONE_AND_TWO;
 
 Review comment:
   Yes, I will update that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199395144
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -696,6 +780,18 @@ public void executeBuildPhase() throws 
SchemaChangeException {
 if ( cycleNum > 0 ) {
   read_right_HV_vector = (IntVector) 
buildBatch.getContainer().getLast();
 }
+//create runtime filter
+if (cycleNum == 0 && enableRuntimeFilter) {
+  //create runtime filter and send out async
+  int condFieldIndex = 0;
+  for (BloomFilter bloomFilter : bloomFilters) {
+for (int ind = 0; ind < currentRecordCount; ind++) {
+  long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+  bloomFilter.insert(hashCode);
 
 Review comment:
   I will enhance this at another JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199374038
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 ##
 @@ -32,35 +33,53 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
+import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+
 
 @JsonTypeName("hash-join")
+@JsonIgnoreProperties(ignoreUnknown = true)
 
 Review comment:
   The primary reason is to backward compatible. PhysicalPlans without 
`runtimeFilterDef` property (maybe not support RuntimeFilter) can still work 
out. That is the `ignoreUnknown=true` usage.  The `@JsonIgnore` is to not 
output the property content as the final json. That's not our target.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199369589
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -226,6 +244,96 @@ public IterOutcome next() {
 }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+if (runtimeFilterWritable == null) {
+  return;
+}
+if (recordCount <= 0) {
+  return;
+}
+List bloomFilters = runtimeFilterWritable.unwrap();
+if (hash64 == null) {
+  ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+  try {
+//generate hash helper
+this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+List hashFieldExps = new ArrayList<>();
+List typedFieldIds = new ArrayList<>();
+for (String toFilterField : toFilterFields) {
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+  this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+  typedFieldIds.add(typedFieldId);
+  ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+  hashFieldExps.add(toHashFieldExp);
+}
+hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+  } catch (Exception e) {
+throw UserException.internalError(e).build(logger);
+  }
+}
+selectionVector2.allocateNew(recordCount);
+BitSet bitSet = new BitSet(recordCount);
+for (int i = 0; i < toFilterFields.size(); i++) {
+  BloomFilter bloomFilter = bloomFilters.get(i);
+  String fieldName = toFilterFields.get(i);
+  computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+}
+int svIndex = 0;
+int tmpFilterRows = 0;
+for (int i = 0; i < recordCount; i++) {
+  boolean contain = bitSet.get(i);
+  if (contain) {
+selectionVector2.setIndex(svIndex, i);
+svIndex++;
+  } else {
+tmpFilterRows++;
+  }
+}
+selectionVector2.setRecordCount(svIndex);
+if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+  recordCount = 0;
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+  totalFilterRows = totalFilterRows + tmpFilterRows;
+  recordCount = svIndex;
+  BatchSchema batchSchema = this.schema;
+  VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+  int fieldCount = batchSchema.getFieldCount();
+  for (int i = 0; i < fieldCount; i++) {
+ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+to.setInitialCapacity(svIndex);
+for (int r = 0; r < svIndex; r++) {
+  to.copyEntry(r, from, selectionVector2.getIndex(r));
 
 Review comment:
   @aman thanks for other valuable reviews ,will soon update that. To this 
point, I initially tend to output the SV2. But to fellow reasons, I give up: 
   
   * Not all the possible operators support the SelectionModel. If users’ rule 
pushed down the filter conditions,the output SV2 maybe not processed by the not 
supported operators(i.e. it’s not definitive  to have a operator which supports 
the SelectionModel above the Scan node).
   
   * The BatchSchema’s SelectionModel also becomes a runtime var.This will also 
affect the upper filter node’s code-gen logic to dynamically generate fresh 
filter codes to the Scan’s SV2.
   
   I agree that the memory copy cost will be less if the above filter node can 
filter more rows over the Scan’s SV2.So what your opinion about this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-06-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199113139
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 ##
 @@ -32,35 +33,53 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
+import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+
 
 @JsonTypeName("hash-join")
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class HashJoinPOP extends AbstractJoinPop {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashJoinPOP.class);
 
+  private RuntimeFilterDef runtimeFilterDef;
 
 Review comment:
   RuntimeFilterDef was created after the physical plan was generated. 
Constructing the BloomFilter at HashJoin node is a heavy work, to a light 
weight join query (i.e. two small tables), it's not applicable to construct a 
bloom filter.So I left some TODO work at the 
RuntimeFilterManager.quelifiedHolders method to value whether applicable to 
apply the RuntimeFilter according to the NDV from both sides. The NDV 
statistics is not supported now. I will fire another work about the Metadata to 
support it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services