amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199376103
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##########
 @@ -226,6 +244,96 @@ public IterOutcome next() {
     }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+    RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+    if (runtimeFilterWritable == null) {
+      return;
+    }
+    if (recordCount <= 0) {
+      return;
+    }
+    List<BloomFilter> bloomFilters = runtimeFilterWritable.unwrap();
+    if (hash64 == null) {
+      ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+      try {
+        //generate hash helper
+        this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+        List<LogicalExpression> hashFieldExps = new ArrayList<>();
+        List<TypedFieldId> typedFieldIds = new ArrayList<>();
+        for (String toFilterField : toFilterFields) {
+          SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+          TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+          this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+          typedFieldIds.add(typedFieldId);
+          ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+          hashFieldExps.add(toHashFieldExp);
+        }
+        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+      } catch (Exception e) {
+        throw UserException.internalError(e).build(logger);
+      }
+    }
+    selectionVector2.allocateNew(recordCount);
+    BitSet bitSet = new BitSet(recordCount);
+    for (int i = 0; i < toFilterFields.size(); i++) {
+      BloomFilter bloomFilter = bloomFilters.get(i);
+      String fieldName = toFilterFields.get(i);
+      computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+    }
+    int svIndex = 0;
+    int tmpFilterRows = 0;
+    for (int i = 0; i < recordCount; i++) {
+      boolean contain = bitSet.get(i);
+      if (contain) {
+        selectionVector2.setIndex(svIndex, i);
+        svIndex++;
+      } else {
+        tmpFilterRows++;
+      }
+    }
+    selectionVector2.setRecordCount(svIndex);
+    if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+      recordCount = 0;
+      selectionVector2.clear();
+      logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+      return;
+    }
+    if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+      totalFilterRows = totalFilterRows + tmpFilterRows;
+      recordCount = svIndex;
+      BatchSchema batchSchema = this.schema;
+      VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+      int fieldCount = batchSchema.getFieldCount();
+      for (int i = 0; i < fieldCount; i++) {
+        ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+        ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+        to.setInitialCapacity(svIndex);
+        for (int r = 0; r < svIndex; r++) {
+          to.copyEntry(r, from, selectionVector2.getIndex(r));
 
 Review comment:
   Hi @weijietong ,  the physical plan operators are supposed to implement 2 
APIs:  `getSupportedEncodings()` API (see [1]) which indicates whether or not 
it accepts an SelectionVector2 (other options are SV4 and NONE) and the 
corresponding `getEncoding()` API.  If an operator does not accept an SV2 or 
SV4, the planner will insert a SelectionVectorRemover just below that node 
which essentially does the copying of qualified rows.  Note that the SVRemover 
is implemented by RemovingRecordBatch [2] which uses a `StraightCopier` 
whenever the child has NONE, so in that case it just does a simple transfer 
instead of copy.  
   
   In the case of Filter,  it can accept both NONE and SV2 because it is 
possible in some cases to have a filter on top of another filter (with some 
intermediate non-blocking operator).  
   
   Let me know if this makes sense.   The main reason I am proposing this is 
the Filter-Scan is a very common pattern and we would want to minimize the copy 
overhead as much as possible.  Unfortunately, I am tied up with some other work 
otherwise I could make the changes on top of your branch to experiment. 
   
   [1] 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java#L51
   [2] 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java#L57

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to