amansinha100 commented on a change in pull request #1334: DRILL-6385: Support
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199376103
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
##########
@@ -226,6 +244,96 @@ public IterOutcome next() {
}
}
+ private void applyRuntimeFilter() throws SchemaChangeException {
+ RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+ if (runtimeFilterWritable == null) {
+ return;
+ }
+ if (recordCount <= 0) {
+ return;
+ }
+ List<BloomFilter> bloomFilters = runtimeFilterWritable.unwrap();
+ if (hash64 == null) {
+ ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this,
context);
+ try {
+ //generate hash helper
+ this.toFilterFields =
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+ List<LogicalExpression> hashFieldExps = new ArrayList<>();
+ List<TypedFieldId> typedFieldIds = new ArrayList<>();
+ for (String toFilterField : toFilterFields) {
+ SchemaPath schemaPath = new SchemaPath(new
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+ TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+ this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+ typedFieldIds.add(typedFieldId);
+ ValueVectorReadExpression toHashFieldExp = new
ValueVectorReadExpression(typedFieldId);
+ hashFieldExps.add(toHashFieldExp);
+ }
+ hash64 = hashHelper.getHash64(hashFieldExps.toArray(new
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new
TypedFieldId[typedFieldIds.size()]));
+ } catch (Exception e) {
+ throw UserException.internalError(e).build(logger);
+ }
+ }
+ selectionVector2.allocateNew(recordCount);
+ BitSet bitSet = new BitSet(recordCount);
+ for (int i = 0; i < toFilterFields.size(); i++) {
+ BloomFilter bloomFilter = bloomFilters.get(i);
+ String fieldName = toFilterFields.get(i);
+ computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+ }
+ int svIndex = 0;
+ int tmpFilterRows = 0;
+ for (int i = 0; i < recordCount; i++) {
+ boolean contain = bitSet.get(i);
+ if (contain) {
+ selectionVector2.setIndex(svIndex, i);
+ svIndex++;
+ } else {
+ tmpFilterRows++;
+ }
+ }
+ selectionVector2.setRecordCount(svIndex);
+ if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+ recordCount = 0;
+ selectionVector2.clear();
+ logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+ return;
+ }
+ if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+ totalFilterRows = totalFilterRows + tmpFilterRows;
+ recordCount = svIndex;
+ BatchSchema batchSchema = this.schema;
+ VectorContainer backUpContainer = new
VectorContainer(this.oContext.getAllocator(), batchSchema);
+ int fieldCount = batchSchema.getFieldCount();
+ for (int i = 0; i < fieldCount; i++) {
+ ValueVector from =
this.getContainer().getValueVector(i).getValueVector();
+ ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+ to.setInitialCapacity(svIndex);
+ for (int r = 0; r < svIndex; r++) {
+ to.copyEntry(r, from, selectionVector2.getIndex(r));
Review comment:
Hi @weijietong , the physical plan operators are supposed to implement 2
APIs: `getSupportedEncodings()` API (see [1]) which indicates whether or not
it accepts an SelectionVector2 (other options are SV4 and NONE) and the
corresponding `getEncoding()` API. If an operator does not accept an SV2 or
SV4, the planner will insert a SelectionVectorRemover just below that node
which essentially does the copying of qualified rows. Note that the SVRemover
is implemented by RemovingRecordBatch [2] which uses a `StraightCopier`
whenever the child has NONE, so in that case it just does a simple transfer
instead of copy.
In the case of Filter, it can accept both NONE and SV2 because it is
possible in some cases to have a filter on top of another filter (with some
intermediate non-blocking operator).
Let me know if this makes sense. The main reason I am proposing this is
the Filter-Scan is a very common pattern and we would want to minimize the copy
overhead as much as possible. Unfortunately, I am tied up with some other work
otherwise I could make the changes on top of your branch to experiment.
[1]
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java#L51
[2]
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java#L57
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services