weijietong commented on a change in pull request #1334: DRILL-6385: Support
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199369589
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
##########
@@ -226,6 +244,96 @@ public IterOutcome next() {
}
}
+ private void applyRuntimeFilter() throws SchemaChangeException {
+ RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+ if (runtimeFilterWritable == null) {
+ return;
+ }
+ if (recordCount <= 0) {
+ return;
+ }
+ List<BloomFilter> bloomFilters = runtimeFilterWritable.unwrap();
+ if (hash64 == null) {
+ ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this,
context);
+ try {
+ //generate hash helper
+ this.toFilterFields =
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+ List<LogicalExpression> hashFieldExps = new ArrayList<>();
+ List<TypedFieldId> typedFieldIds = new ArrayList<>();
+ for (String toFilterField : toFilterFields) {
+ SchemaPath schemaPath = new SchemaPath(new
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+ TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+ this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+ typedFieldIds.add(typedFieldId);
+ ValueVectorReadExpression toHashFieldExp = new
ValueVectorReadExpression(typedFieldId);
+ hashFieldExps.add(toHashFieldExp);
+ }
+ hash64 = hashHelper.getHash64(hashFieldExps.toArray(new
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new
TypedFieldId[typedFieldIds.size()]));
+ } catch (Exception e) {
+ throw UserException.internalError(e).build(logger);
+ }
+ }
+ selectionVector2.allocateNew(recordCount);
+ BitSet bitSet = new BitSet(recordCount);
+ for (int i = 0; i < toFilterFields.size(); i++) {
+ BloomFilter bloomFilter = bloomFilters.get(i);
+ String fieldName = toFilterFields.get(i);
+ computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+ }
+ int svIndex = 0;
+ int tmpFilterRows = 0;
+ for (int i = 0; i < recordCount; i++) {
+ boolean contain = bitSet.get(i);
+ if (contain) {
+ selectionVector2.setIndex(svIndex, i);
+ svIndex++;
+ } else {
+ tmpFilterRows++;
+ }
+ }
+ selectionVector2.setRecordCount(svIndex);
+ if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+ recordCount = 0;
+ selectionVector2.clear();
+ logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+ return;
+ }
+ if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+ totalFilterRows = totalFilterRows + tmpFilterRows;
+ recordCount = svIndex;
+ BatchSchema batchSchema = this.schema;
+ VectorContainer backUpContainer = new
VectorContainer(this.oContext.getAllocator(), batchSchema);
+ int fieldCount = batchSchema.getFieldCount();
+ for (int i = 0; i < fieldCount; i++) {
+ ValueVector from =
this.getContainer().getValueVector(i).getValueVector();
+ ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+ to.setInitialCapacity(svIndex);
+ for (int r = 0; r < svIndex; r++) {
+ to.copyEntry(r, from, selectionVector2.getIndex(r));
Review comment:
@aman thanks for other valuable reviews ,will soon update that. To this
point, I initially tend to output the SV2. But to fellow reasons, I give up:
* Not all the possible operators support the SelectionModel. If users’ rule
pushed down the filter conditions,the output SV2 maybe not processed by the not
supported operators(i.e. it’s not definitive to have a operator which supports
the SelectionModel above the Scan node).
* The BatchSchema’s SelectionModel also becomes a runtime var.This will also
affect the upper filter node’s code-gen logic to dynamically generate fresh
filter codes to the Scan’s SV2.
I agree that the memory copy cost will be less if the above filter node can
filter more rows over the Scan’s SV2.So what your opinion about this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services