This is an automated email from the ASF dual-hosted git repository. amansinha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7571d52eab9b961687df7d4fb845d0a297b228bb Author: Aman Sinha <asi...@maprtech.com> AuthorDate: Sat Oct 13 23:38:17 2018 -0700 DRILL-6381: Address code review comments (part 3). DRILL-6381: Add missing joinControl logic for INTERSECT_DISTINCT. - Modified HashJoin's probe phase to process INTERSECT_DISTINCT. - NOTE: For build phase, the functionality will be same as for SemiJoin when it is added later. DRILL-6381: Address code review comment for intersect_distinct. DRILL-6381: Rebase on latest master and fix compilation issues. DRILL-6381: Generate protobuf files for C++ native client. DRILL-6381: Use shaded Guava classes. Add more comments and Javadoc. --- .../planner/index/MapRDBFunctionalIndexInfo.java | 4 +- .../exec/planner/index/MapRDBIndexDiscover.java | 12 ++--- .../drill/exec/store/mapr/PluginConstants.java | 3 ++ .../exec/store/mapr/db/MapRDBFormatPlugin.java | 6 ++- .../store/mapr/db/MapRDBPushFilterIntoScan.java | 17 ++++--- .../store/mapr/db/MapRDBPushLimitIntoScan.java | 2 +- .../store/mapr/db/MapRDBPushProjectIntoScan.java | 35 ++++++------- .../mapr/db/MapRDBRestrictedScanBatchCreator.java | 4 +- .../db/json/JsonTableRangePartitionFunction.java | 4 +- .../store/mapr/db/json/MaprDBJsonRecordReader.java | 2 +- .../mapr/db/json/RestrictedJsonTableGroupScan.java | 6 +-- contrib/native/client/src/protobuf/BitData.pb.cc | 55 ++++++++++++++++++--- contrib/native/client/src/protobuf/BitData.pb.h | 36 +++++++++++++- .../native/client/src/protobuf/UserBitShared.pb.cc | 57 +++++++++++----------- .../native/client/src/protobuf/UserBitShared.pb.h | 7 +-- .../exec/store/hbase/HBasePushFilterIntoScan.java | 2 +- .../store/kafka/KafkaPushDownFilterIntoScan.java | 4 +- .../store/mongo/MongoPushDownFilterForScan.java | 2 +- .../apache/drill/exec/physical/impl/ScanBatch.java | 17 +++---- .../exec/physical/impl/common/HashPartition.java | 10 ++-- .../physical/impl/join/HashJoinProbeTemplate.java | 24 +++++++-- .../exec/physical/impl/join/RowKeyJoinBatch.java | 6 +++ .../physical/impl/partitionsender/Partitioner.java | 3 ++ .../RangePartitionRecordBatch.java | 7 +++ .../exec/planner/common/DrillScanRelBase.java | 2 +- .../index/InvalidIndexDefinitionException.java | 4 ++ .../generators/IndexIntersectPlanGenerator.java | 18 +++---- .../planner/logical/DrillMergeProjectRule.java | 8 +++ .../planner/physical/BroadcastExchangePrel.java | 5 +- .../planner/physical/ConvertCountToDirectScan.java | 5 +- .../drill/exec/planner/physical/ScanPrel.java | 9 ---- .../drill/exec/planner/physical/ScanPrule.java | 2 +- .../InfoSchemaPushFilterIntoRecordGenerator.java | 2 +- .../exec/store/parquet/ParquetPushDownFilter.java | 2 +- .../physical/impl/common/HashPartitionTest.java | 2 +- 35 files changed, 248 insertions(+), 136 deletions(-) diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java index 67938f3..564a037 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.planner.index; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java index c231e11..aed3e04 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java @@ -100,7 +100,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco for (IndexDesc idx : indexes) { DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx); if (hbaseIdx == null) { - //not able to build a valid index based on the index info from MFS + // not able to build a valid index based on the index info from MFS logger.error("Not able to build index for {}", idx.toString()); continue; } @@ -233,9 +233,9 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco } private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException { - //get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax + // get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax String castTypeStr = getDrillTypeStr(type); - if(castTypeStr == null) {//no cast + if(castTypeStr == null) { // no cast throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field); } try { @@ -255,7 +255,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException { final String fieldName = desc.getFieldPath().asPathString(); final String functionDef = desc.getFunctionName(); - if ((functionDef != null)) {//this is a function + if ((functionDef != null)) { // this is a function String[] tokens = functionDef.split("\\s+"); if (tokens[0].equalsIgnoreCase("cast")) { if (tokens.length != 3) { @@ -270,7 +270,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef); } } - //else it is a schemaPath + // else it is a schemaPath return fieldName2SchemaPath(fieldName); } @@ -285,7 +285,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) { List<RelFieldCollation> fieldCollations = new ArrayList<>(); - int i=0; + int i = 0; for (IndexFieldDesc field : descCollection) { RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ? RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ? diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java index 4239c5d..7a175a2 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java @@ -53,6 +53,9 @@ public class PluginConstants { public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32; + public static final int JSON_TABLE_SCAN_SIZE_MB_MIN = 32; + public static final int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192; + public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB"; public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index 0d1bf04..fc8a057 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -66,13 +66,15 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { connection = ConnectionFactory.createConnection(hbaseConf); jsonTableCache = new MapRDBTableCache(context.getConfig()); int scanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB); - if (scanRangeSizeMBConfig < 32 || scanRangeSizeMBConfig > 8192) { + if (scanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN || + scanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) { logger.warn("Invalid scan size {} for MapR-DB tables, using default", scanRangeSizeMBConfig); scanRangeSizeMBConfig = PluginConstants.JSON_TABLE_SCAN_SIZE_MB_DEFAULT; } int restrictedScanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB); - if (restrictedScanRangeSizeMBConfig < 32 || restrictedScanRangeSizeMBConfig > 8192) { + if (restrictedScanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN || + restrictedScanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) { logger.warn("Invalid restricted scan size {} for MapR-DB tables, using default", restrictedScanRangeSizeMBConfig); restrictedScanRangeSizeMBConfig = PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java index 511a111..a0f5536 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java @@ -51,8 +51,9 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul @Override public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - final FilterPrel filter = (FilterPrel) call.rel(0); + final FilterPrel filter = call.rel(0); + final ScanPrel scan = call.rel(1); + final RexNode condition = filter.getCondition(); if (scan.getGroupScan() instanceof BinaryTableGroupScan) { @@ -80,9 +81,9 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul @Override public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(2); - final ProjectPrel project = (ProjectPrel) call.rel(1); - final FilterPrel filter = (FilterPrel) call.rel(0); + final FilterPrel filter = call.rel(0); + final ProjectPrel project = call.rel(1); + final ScanPrel scan = call.rel(2); // convert the filter to one that references the child of the project final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); @@ -134,13 +135,13 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp); final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree(); if (newScanSpec == null) { - return; //no filter pushdown ==> No transformation. + return; // no filter pushdown ==> No transformation. } final JsonTableGroupScan newGroupsScan = (JsonTableGroupScan) groupScan.clone(newScanSpec); newGroupsScan.setFilterPushedDown(true); - final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); + final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel)); @@ -186,7 +187,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul groupScan.getTableStats()); newGroupsScan.setFilterPushedDown(true); - final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); + final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java index a26bc80..28d59d0 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java @@ -47,8 +47,8 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule @Override public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = call.rel(1); final LimitPrel limit = call.rel(0); + final ScanPrel scan = call.rel(1); doPushLimitIntoGroupScan(call, limit, null, scan, scan.getGroupScan()); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java index 5215868..d8d0a2c 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.store.mapr.db; -import com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelTrait; @@ -30,18 +30,21 @@ import org.apache.calcite.rex.RexNode; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.RelOptHelper; -import org.apache.drill.exec.planner.common.DrillRelOptUtil; -import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; import org.apache.drill.exec.util.Utilities; import java.util.List; +/** + * Push a physical Project into Scan. Currently, this rule is only doing projection pushdown for MapRDB-JSON tables + * since it was needed for the secondary index feature which only applies to Json tables. + * For binary tables, note that the DrillPushProjectIntoScanRule is still applicable during the logical + * planning phase. + */ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushProjectIntoScan.class); @@ -53,17 +56,10 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushProjIntoScan:Proj_On_Scan") { @Override public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - final ProjectPrel project = (ProjectPrel) call.rel(0); - if (!(scan.getGroupScan() instanceof MapRDBGroupScan)) { - return; - } - doPushProjectIntoGroupScan(call, project, scan, (MapRDBGroupScan) scan.getGroupScan()); - if (scan.getGroupScan() instanceof BinaryTableGroupScan) { - BinaryTableGroupScan groupScan = (BinaryTableGroupScan) scan.getGroupScan(); + final ProjectPrel project = call.rel(0); + final ScanPrel scan = call.rel(1); - } else { - assert (scan.getGroupScan() instanceof JsonTableGroupScan); + if (scan.getGroupScan() instanceof JsonTableGroupScan) { JsonTableGroupScan groupScan = (JsonTableGroupScan) scan.getGroupScan(); doPushProjectIntoGroupScan(call, project, scan, groupScan); @@ -72,9 +68,10 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu @Override public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - if (scan.getGroupScan() instanceof BinaryTableGroupScan || - scan.getGroupScan() instanceof JsonTableGroupScan) { + final ScanPrel scan = call.rel(1); + + // See class level comments above for why only JsonGroupScan is considered + if (scan.getGroupScan() instanceof JsonTableGroupScan) { return super.matches(call); } return false; @@ -82,12 +79,12 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu }; protected void doPushProjectIntoGroupScan(RelOptRuleCall call, - ProjectPrel project, ScanPrel scan, MapRDBGroupScan groupScan) { + ProjectPrel project, ScanPrel scan, JsonTableGroupScan groupScan) { try { DrillRelOptUtil.ProjectPushInfo columnInfo = DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects()); - if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields()) // + if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields()) || !groupScan.canPushdownProjects(columnInfo.getFields())) { return; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java index 89ce95d..cb3732a 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.store.mapr.db; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java index c0b73ee..ca508ca 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java @@ -30,8 +30,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import com.mapr.db.Table; import com.mapr.db.impl.ConditionImpl; import com.mapr.db.impl.IdCodec; diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 63a9381..0be44e8 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -236,7 +236,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { idOnly = (scannedFields == null); } - if(projectWholeDocument) { + if (projectWholeDocument) { projector = new FieldProjector(projectedFieldsSet); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java index 2f06d00..055c5a5 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java @@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; @@ -57,7 +57,7 @@ public class RestrictedJsonTableGroupScan extends JsonTableGroupScan { @JsonProperty("format") MapRDBFormatPlugin formatPlugin, @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */ @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("")MapRDBStatistics statistics) { + @JsonProperty("") MapRDBStatistics statistics) { super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics); } diff --git a/contrib/native/client/src/protobuf/BitData.pb.cc b/contrib/native/client/src/protobuf/BitData.pb.cc index b32509c..ddee323 100644 --- a/contrib/native/client/src/protobuf/BitData.pb.cc +++ b/contrib/native/client/src/protobuf/BitData.pb.cc @@ -99,13 +99,14 @@ void protobuf_AssignDesc_BitData_2eproto() { ::google::protobuf::MessageFactory::generated_factory(), sizeof(FragmentRecordBatch)); RuntimeFilterBDef_descriptor_ = file->message_type(3); - static const int RuntimeFilterBDef_offsets_[6] = { + static const int RuntimeFilterBDef_offsets_[7] = { GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, query_id_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, major_fragment_id_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, minor_fragment_id_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, to_foreman_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, bloom_filter_size_in_bytes_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, probe_fields_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, hj_op_id_), }; RuntimeFilterBDef_reflection_ = new ::google::protobuf::internal::GeneratedMessageReflection( @@ -177,16 +178,16 @@ void protobuf_AddDesc_BitData_2eproto() { " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" - "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" + "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" - "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" - "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020" - "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" - "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" - "ataH\001", 885); + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" + "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n" + "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" + "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" + "l.exec.protoB\007BitDataH\001", 903); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "BitData.proto", &protobuf_RegisterTypes); BitClientHandshake::default_instance_ = new BitClientHandshake(); @@ -1208,6 +1209,7 @@ const int RuntimeFilterBDef::kMinorFragmentIdFieldNumber; const int RuntimeFilterBDef::kToForemanFieldNumber; const int RuntimeFilterBDef::kBloomFilterSizeInBytesFieldNumber; const int RuntimeFilterBDef::kProbeFieldsFieldNumber; +const int RuntimeFilterBDef::kHjOpIdFieldNumber; #endif // !_MSC_VER RuntimeFilterBDef::RuntimeFilterBDef() @@ -1231,6 +1233,7 @@ void RuntimeFilterBDef::SharedCtor() { major_fragment_id_ = 0; minor_fragment_id_ = 0; to_foreman_ = false; + hj_op_id_ = 0; ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -1273,6 +1276,7 @@ void RuntimeFilterBDef::Clear() { major_fragment_id_ = 0; minor_fragment_id_ = 0; to_foreman_ = false; + hj_op_id_ = 0; } bloom_filter_size_in_bytes_.Clear(); probe_fields_.Clear(); @@ -1384,6 +1388,22 @@ bool RuntimeFilterBDef::MergePartialFromCodedStream( goto handle_uninterpreted; } if (input->ExpectTag(50)) goto parse_probe_fields; + if (input->ExpectTag(56)) goto parse_hj_op_id; + break; + } + + // optional int32 hj_op_id = 7; + case 7: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) { + parse_hj_op_id: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &hj_op_id_))); + set_has_hj_op_id(); + } else { + goto handle_uninterpreted; + } if (input->ExpectAtEnd()) return true; break; } @@ -1442,6 +1462,11 @@ void RuntimeFilterBDef::SerializeWithCachedSizes( 6, this->probe_fields(i), output); } + // optional int32 hj_op_id = 7; + if (has_hj_op_id()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(7, this->hj_op_id(), output); + } + if (!unknown_fields().empty()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( unknown_fields(), output); @@ -1487,6 +1512,11 @@ void RuntimeFilterBDef::SerializeWithCachedSizes( WriteStringToArray(6, this->probe_fields(i), target); } + // optional int32 hj_op_id = 7; + if (has_hj_op_id()) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(7, this->hj_op_id(), target); + } + if (!unknown_fields().empty()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( unknown_fields(), target); @@ -1524,6 +1554,13 @@ int RuntimeFilterBDef::ByteSize() const { total_size += 1 + 1; } + // optional int32 hj_op_id = 7; + if (has_hj_op_id()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->hj_op_id()); + } + } // repeated int32 bloom_filter_size_in_bytes = 5; { @@ -1582,6 +1619,9 @@ void RuntimeFilterBDef::MergeFrom(const RuntimeFilterBDef& from) { if (from.has_to_foreman()) { set_to_foreman(from.to_foreman()); } + if (from.has_hj_op_id()) { + set_hj_op_id(from.hj_op_id()); + } } mutable_unknown_fields()->MergeFrom(from.unknown_fields()); } @@ -1611,6 +1651,7 @@ void RuntimeFilterBDef::Swap(RuntimeFilterBDef* other) { std::swap(to_foreman_, other->to_foreman_); bloom_filter_size_in_bytes_.Swap(&other->bloom_filter_size_in_bytes_); probe_fields_.Swap(&other->probe_fields_); + std::swap(hj_op_id_, other->hj_op_id_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.Swap(&other->_unknown_fields_); std::swap(_cached_size_, other->_cached_size_); diff --git a/contrib/native/client/src/protobuf/BitData.pb.h b/contrib/native/client/src/protobuf/BitData.pb.h index 8a0b60c..7ee0bc6 100644 --- a/contrib/native/client/src/protobuf/BitData.pb.h +++ b/contrib/native/client/src/protobuf/BitData.pb.h @@ -521,6 +521,13 @@ class RuntimeFilterBDef : public ::google::protobuf::Message { inline const ::google::protobuf::RepeatedPtrField< ::std::string>& probe_fields() const; inline ::google::protobuf::RepeatedPtrField< ::std::string>* mutable_probe_fields(); + // optional int32 hj_op_id = 7; + inline bool has_hj_op_id() const; + inline void clear_hj_op_id(); + static const int kHjOpIdFieldNumber = 7; + inline ::google::protobuf::int32 hj_op_id() const; + inline void set_hj_op_id(::google::protobuf::int32 value); + // @@protoc_insertion_point(class_scope:exec.bit.data.RuntimeFilterBDef) private: inline void set_has_query_id(); @@ -531,6 +538,8 @@ class RuntimeFilterBDef : public ::google::protobuf::Message { inline void clear_has_minor_fragment_id(); inline void set_has_to_foreman(); inline void clear_has_to_foreman(); + inline void set_has_hj_op_id(); + inline void clear_has_hj_op_id(); ::google::protobuf::UnknownFieldSet _unknown_fields_; @@ -538,11 +547,12 @@ class RuntimeFilterBDef : public ::google::protobuf::Message { ::google::protobuf::int32 major_fragment_id_; ::google::protobuf::int32 minor_fragment_id_; ::google::protobuf::RepeatedField< ::google::protobuf::int32 > bloom_filter_size_in_bytes_; - ::google::protobuf::RepeatedPtrField< ::std::string> probe_fields_; bool to_foreman_; + ::google::protobuf::int32 hj_op_id_; + ::google::protobuf::RepeatedPtrField< ::std::string> probe_fields_; mutable int _cached_size_; - ::google::protobuf::uint32 _has_bits_[(6 + 31) / 32]; + ::google::protobuf::uint32 _has_bits_[(7 + 31) / 32]; friend void protobuf_AddDesc_BitData_2eproto(); friend void protobuf_AssignDesc_BitData_2eproto(); @@ -1043,6 +1053,28 @@ RuntimeFilterBDef::mutable_probe_fields() { return &probe_fields_; } +// optional int32 hj_op_id = 7; +inline bool RuntimeFilterBDef::has_hj_op_id() const { + return (_has_bits_[0] & 0x00000040u) != 0; +} +inline void RuntimeFilterBDef::set_has_hj_op_id() { + _has_bits_[0] |= 0x00000040u; +} +inline void RuntimeFilterBDef::clear_has_hj_op_id() { + _has_bits_[0] &= ~0x00000040u; +} +inline void RuntimeFilterBDef::clear_hj_op_id() { + hj_op_id_ = 0; + clear_has_hj_op_id(); +} +inline ::google::protobuf::int32 RuntimeFilterBDef::hj_op_id() const { + return hj_op_id_; +} +inline void RuntimeFilterBDef::set_hj_op_id(::google::protobuf::int32 value) { + set_has_hj_op_id(); + hj_op_id_ = value; +} + // @@protoc_insertion_point(namespace_scope) diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index d0e4aa5..8bb6e07 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -750,40 +750,40 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" - "\022\032\n\026CANCELLATION_REQUESTED\020\006*\367\010\n\020CoreOpe" + "\022\032\n\026CANCELLATION_REQUESTED\020\006*\222\t\n\020CoreOpe" "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" "H_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGI" "NG_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDE" "R\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013" - "\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECT" - "ION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREG" - "ATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021" - "\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026" - "PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCA" - "N\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_S" - "CAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_" - "SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN" - "\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB" - "_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER" - "_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDO" - "W\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SC" - "AN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SCA" - "N\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014L" - "ATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL_" - "NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC_" - "SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SUB" - "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT" - "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" - "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S" - "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" - "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RU" - "NTIME_FILTER\0208*g\n\nSaslStatus\022\020\n\014SASL_UNK" - "NOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRE" - "SS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B" - ".\n\033org.apache.drill.exec.protoB\rUserBitS" - "haredH\001", 5447); + "\022\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r" + "\022\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAM" + "ING_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTER" + "NAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_" + "SORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHI" + "VE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\r" + "MOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017D" + "IRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEX" + "T_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_" + "SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025" + "\n\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020" + "!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rA" + "VRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAF" + "KA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLAT" + "TEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(H" + "IVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020" + "+\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017" + "MAPRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n" + "\013KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017" + "\n\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022" + "\022\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN" + "\0205\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SC" + "AN\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\020" + "9*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSAS" + "L_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_" + "SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache" + ".drill.exec.protoB\rUserBitSharedH\001", 5474); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); UserCredentials::default_instance_ = new UserCredentials(); @@ -961,6 +961,7 @@ bool CoreOperatorType_IsValid(int value) { case 54: case 55: case 56: + case 57: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index 8494857..ab3063d 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -216,7 +216,7 @@ enum CoreOperatorType { ORDERED_PARTITION_SENDER = 9, PROJECT = 10, UNORDERED_RECEIVER = 11, - RANGE_SENDER = 12, + RANGE_PARTITION_SENDER = 12, SCREEN = 13, SELECTION_VECTOR_REMOVER = 14, STREAMING_AGGREGATE = 15, @@ -260,11 +260,12 @@ enum CoreOperatorType { SEQUENCE_SUB_SCAN = 53, PARTITION_LIMIT = 54, PCAPNG_SUB_SCAN = 55, - RUNTIME_FILTER = 56 + RUNTIME_FILTER = 56, + ROWKEY_JOIN = 57 }; bool CoreOperatorType_IsValid(int value); const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -const CoreOperatorType CoreOperatorType_MAX = RUNTIME_FILTER; +const CoreOperatorType CoreOperatorType_MAX = ROWKEY_JOIN; const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java index 91ca787..3faa089 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java @@ -122,7 +122,7 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule newScanSpec, groupScan.getColumns()); newGroupsScan.setFilterPushedDown(true); - final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); + final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of(newScanPrel)); diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java index 019a67e..002d043 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java @@ -66,8 +66,8 @@ public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule { logger.info("Partitions ScanSpec after pushdown: " + newScanSpec); GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec); - final ScanPrel newScanPrel = - new ScanPrel(scan, filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); + final ScanPrel newScanPrel = + new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(newScanPrel))); } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java index 8ad84c1..be157b4 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java @@ -77,7 +77,7 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule { } newGroupsScan.setFilterPushedDown(true); - final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), + final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); if (mongoFilterBuilder.isAllExpressionsConverted()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 70b2852..a688f37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -182,11 +182,11 @@ public class ScanBatch implements CloseableRecordBatch { if(isRepeatableScan) { readers = readerList.iterator(); return IterOutcome.NONE; - } - else { + } else { releaseAssets(); // All data has been read. Release resource. done = true; - return IterOutcome.NONE;} + return IterOutcome.NONE; + } } /** @@ -204,11 +204,10 @@ public class ScanBatch implements CloseableRecordBatch { return false; } return true; - } - else {// Regular scan + } else { // Regular scan currentReader.close(); currentReader = null; - return true;// In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop + return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop } } @@ -283,7 +282,7 @@ public class ScanBatch implements CloseableRecordBatch { } } lastOutcome = IterOutcome.STOP; - throw UserException.systemError(e) + throw UserException.internalError(e) .addContext("Setup failed for", currentReaderClassName) .build(logger); } catch (UserException ex) { @@ -291,7 +290,7 @@ public class ScanBatch implements CloseableRecordBatch { throw ex; } catch (Exception ex) { lastOutcome = IterOutcome.STOP; - throw UserException.systemError(ex).build(logger); + throw UserException.internalError(ex).build(logger); } finally { oContext.getStats().stopProcessing(); } @@ -334,7 +333,7 @@ public class ScanBatch implements CloseableRecordBatch { } } catch(SchemaChangeException e) { // No exception should be thrown here. - throw UserException.systemError(e) + throw UserException.internalError(e) .addContext("Failure while allocating implicit vectors") .build(logger); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java index 1f7da38..86b870d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -378,7 +378,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException { return hashTable.probeForKey(recordsProcessed, hashCode); } - public int getStartIndex(int probeIndex) { + public Pair<Integer, Boolean> getStartIndex(int probeIndex) { /* The current probe record has a key that matches. Get the index * of the first row in the build side that matches the current key */ @@ -387,15 +387,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT * join we keep track of which records we need to project at the end */ - hjHelper.setRecordMatched(compositeIndex); - return compositeIndex; + boolean matchExists = hjHelper.setRecordMatched(compositeIndex); + return Pair.of(compositeIndex, matchExists); } public int getNextIndex(int compositeIndex) { // in case of iner rows with duplicate keys, get the next one return hjHelper.getNextIndex(compositeIndex); } - public void setRecordMatched(int compositeIndex) { - hjHelper.setRecordMatched(compositeIndex); + public boolean setRecordMatched(int compositeIndex) { + return hjHelper.setRecordMatched(compositeIndex); } public List<Integer> getNextUnmatchedIndex() { return hjHelper.getNextUnmatchedIndex(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index c16812e..57b2d5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; @@ -47,8 +48,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Join type, INNER, LEFT, RIGHT or OUTER private JoinRelType joinType; - //joinControl object derived from the int type joinControl passed from outgoingBatch(HashJoinBatch) - //so we can do different things in hashtable for INTERSECT_DISTINCT and INTERSECT_ALL + // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL private JoinControl joinControl; private HashJoinBatch outgoingJoinBatch = null; @@ -325,16 +325,30 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { * of the first row in the build side that matches the current key * (and record this match in the bitmap, in case of a FULL/RIGHT join) */ - currentCompositeIdx = currPartition.getStartIndex(probeIndex); + Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex); + + boolean matchExists = matchStatus.getRight(); + + if (joinControl.isIntersectDistinct() && matchExists) { + // since it is intersect distinct and we already have one record matched, move to next probe row + recordsProcessed++; + continue; + } + + currentCompositeIdx = matchStatus.getLeft(); outputRecords = outputRow(currPartition.getContainers(), currentCompositeIdx, probeBatch.getContainer(), recordsProcessed); /* Projected single row from the build side with matching key but there - * may be more rows with the same key. Check if that's the case + * may be more rows with the same key. Check if that's the case as long as + * we are not doing intersect distinct since it only cares about + * distinct values. */ - currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); + currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 : + currPartition.getNextIndex(currentCompositeIdx); + if (currentCompositeIdx == -1) { /* We only had one row in the build side that matched the current key * from the probe side. Drain the next row in the probe side. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java index 3b5566b..941f321 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java @@ -281,4 +281,10 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen super.close(); } + @Override + public void dump() { + logger.error("RowKeyJoinBatch[container={}, left={}, right={}, hasRowKeyBatch={}, rkJoinState={}]", + container, left, right, hasRowKeyBatch, rkJoinState); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 2e2e760..a2fc069 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -29,6 +29,9 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.record.RecordBatch; public interface Partitioner { + // Keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors; however + // other criteria such as batch sizing in terms of actual MBytes rather than record count could also be applied + // by the operator. int DEFAULT_RECORD_BATCH_SIZE = (1 << 10) - 1; void setup(ExchangeFragmentContext context, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java index d8fc94d..6a54828 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java @@ -183,4 +183,11 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa return counter; } + + @Override + public void dump() { + logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]", + container, numPartitions, recordCount, partitionIdVector); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java index 6f8ee0e..6409c8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java @@ -34,7 +34,7 @@ import org.apache.drill.exec.util.Utilities; * Base class for logical/physical scan rel implemented in Drill. */ public abstract class DrillScanRelBase extends TableScan implements DrillRelNode { - private GroupScan groupScan; + protected GroupScan groupScan; protected final DrillTable drillTable; public DrillScanRelBase(RelOptCluster cluster, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java index c17d09f..518aa67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java @@ -19,6 +19,10 @@ package org.apache.drill.exec.planner.index; import org.apache.drill.common.exceptions.DrillRuntimeException; +/** + * An InvalidIndexDefinitionException may be thrown if Drill does not recognize the + * type or expression of the index during the index discovery phase + */ public class InvalidIndexDefinitionException extends DrillRuntimeException { public InvalidIndexDefinitionException(String message) { super(message); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java index b380c28..11d7358 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java @@ -94,7 +94,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { RelOptUtil.createEquiJoinCondition(left, leftJoinKeys, right, rightJoinKeys, builder); - if (isRowKeyJoin == true) { + if (isRowKeyJoin) { RelNode newRel; if (settings.isIndexUseHashJoinNonCovering()) { HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left, @@ -109,7 +109,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { // since there is a restricted Scan on left side, assume original project return buildOriginalProject(newRel); } else { - //there is no restricted scan on left, do a regular rowkey join + // there is no restricted scan on left, do a regular rowkey join HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left, right, joinCondition, JoinRelType.INNER, false /* no swap */, null /* no runtime filter */, isRowKeyJoin, htControl); @@ -185,7 +185,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { indexScanRowType, builder, functionInfo)); // project the rowkey column from the index scan List<RexNode> indexProjectExprs = Lists.newArrayList(); - int rowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan);//indexGroupScan.getRowKeyOrdinal(); + int rowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan); assert rowKeyIndex >= 0; indexProjectExprs.add(RexInputRef.of(rowKeyIndex, indexScanPrel.getRowType())); @@ -204,7 +204,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { indexFilterPrel, indexProjectExprs, indexProjectRowType); RelTraitSet rightSideTraits = newTraitSet().plus(Prel.DRILL_PHYSICAL); - //if build(right) side does not exist, this index scan is the right most. + // if build(right) side does not exist, this index scan is the right most. if (right == null) { if (partition == DrillDistributionTrait.RANDOM_DISTRIBUTED && settings.getSliceTarget() < indexProjectPrel.getRows()) { @@ -220,7 +220,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { return converted; } - //if build(right) side exist, the plan we got in 'converted' is left (probe). Intersect with right(build) side + // if build(right) side exist, the plan we got in 'converted' is left (probe). Intersect with right(build) side RelNode finalRel = buildRowKeyJoin(converted, right, false, JoinControl.INTERSECT_DISTINCT); if (generateDistribution && @@ -310,7 +310,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException { Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap(); boolean isAnyIndexAsync = false; - for(IndexDescriptor idx : indexInfoMap.keySet()) { + for (IndexDescriptor idx : indexInfoMap.keySet()) { idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition); if (!isAnyIndexAsync && idx.isAsyncIndex()) { isAnyIndexAsync = true; @@ -322,7 +322,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { int curIdx = 0; RexNode remnant = indexContext.getFilterCondition(); for (Map.Entry<IndexDescriptor, RexNode> pair : idxConditionMap.entrySet()) { - //For the last index, the generated join is distributed using createRangeDistRight instead! + // For the last index, the generated join is distributed using createRangeDistRight instead! generateDistribution = (idxConditionMap.entrySet().size()-1-curIdx) > 0; indexPlan = buildIntersectPlan(pair, indexPlan, generateDistribution); remnant = indexInfoMap.get(pair.getKey()).remainderCondition; @@ -333,12 +333,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator { final RelNode rangeDistRight = createRangeDistRight(indexPlan, rightRowKeyField, (DbGroupScan)IndexPlanUtils.getGroupScan(origScan)); - //now with index plan constructed, build plan of left(probe) side to use restricted db scan + // now with index plan constructed, build plan of left(probe) side to use restricted db scan Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync); RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT); - if ( upperProject != null) { + if (upperProject != null) { ProjectPrel cap = new ProjectPrel(finalRel.getCluster(), finalRel.getTraitSet(), finalRel, IndexPlanUtils.getProjects(upperProject), upperProject.getRowType()); finalRel = cap; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java index 3a3c409..10f9567 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java @@ -169,6 +169,14 @@ public class DrillMergeProjectRule extends RelOptRule { return list; } + /** + * The purpose of the replace() method is to allow the caller to replace a 'top' and 'bottom' project with + * a single merged project with the assumption that caller knows exactly the semantics/correctness of merging + * the two projects. This is not applying the full fledged DrillMergeProjectRule. + * @param topProject + * @param bottomProject + * @return new project after replacement + */ public static Project replace(Project topProject, Project bottomProject) { final List<RexNode> newProjects = RelOptUtil.pushPastProject(topProject.getProjects(), bottomProject); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java index 1ae375b..0155734 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java @@ -58,7 +58,10 @@ public class BroadcastExchangePrel extends ExchangePrel{ final int rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH; final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows; - //we assume localhost network cost is 1/10 of regular network cost + // We assume localhost network cost is 1/10 of regular network cost + // ( c * num_bytes * (N - 1) ) + ( c * num_bytes * 0.1) + // = c * num_bytes * (N - 0.9) + // TODO: a similar adjustment should be made to HashExchangePrel final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * (numEndPoints - 0.9); return new DrillCostBase(inputRows, cpuCost, 0, networkCost); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java index 5204495..d903165 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java @@ -128,8 +128,8 @@ public class ConvertCountToDirectScan extends Prule { final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); - final ScanPrel newScan = new ScanPrel(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), - directScan, scanRowType); + final DirectScanPrel newScan = new DirectScanPrel(scan.getCluster(), + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, scanRowType); final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); @@ -158,7 +158,6 @@ public class ConvertCountToDirectScan extends Prule { for (int i = 0; i < agg.getAggCallList().size(); i++) { AggregateCall aggCall = agg.getAggCallList().get(i); - //for (AggregateCall aggCall : agg.getAggCallList()) { long cnt; // rule can be applied only for count function, return empty counts diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 4db5ba2..8a46f86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -48,15 +48,6 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA private final RelDataType rowType; - public ScanPrel(RelNode old, RelTraitSet traitSets, GroupScan scan, RelDataType rowType) { - this(old.getCluster(), traitSets, scan, rowType); - } - - public ScanPrel(RelOptCluster cluster, RelTraitSet traits, GroupScan groupScan, RelDataType rowType) { - super(cluster, traits); - this.groupScan = getCopy(groupScan); - } - public ScanPrel(RelOptCluster cluster, RelTraitSet traits, GroupScan groupScan, RelDataType rowType, RelOptTable table) { super(cluster, traits, getCopy(groupScan), table); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index 6ee05eb..b4fada6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -44,7 +44,7 @@ public class ScanPrule extends Prule{ final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition); - final ScanPrel newScan = new ScanPrel(scan, traits, groupScan, scan.getRowType(), scan.getTable()); + final ScanPrel newScan = new ScanPrel(scan.getCluster(), traits, groupScan, scan.getRowType(), scan.getTable()); call.transformTo(newScan); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java index 92332c3..44d2394 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java @@ -99,7 +99,7 @@ public abstract class InfoSchemaPushFilterIntoRecordGenerator extends StoragePlu final InfoSchemaGroupScan newGroupsScan = new InfoSchemaGroupScan(groupScan.getTable(), infoSchemaFilter); newGroupsScan.setFilterPushedDown(true); - RelNode input = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); + RelNode input = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); if (project != null) { input = project.copy(project.getTraitSet(), input, project.getProjects(), filter.getRowType()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java index a8b981c..6efd44d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java @@ -167,7 +167,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { return; } - RelNode newScan = new ScanPrel(scan, scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); + RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); if (project != null) { newScan = project.copy(project.getTraitSet(), ImmutableList.of(newScan)); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java index aafcb4d..e2f80d8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java @@ -132,7 +132,7 @@ public class HashPartitionTest { { int compositeIndex = hashPartition.probeForKey(1, 12); - int startIndex = hashPartition.getStartIndex(compositeIndex); + int startIndex = hashPartition.getStartIndex(compositeIndex).getLeft(); int nextIndex = hashPartition.getNextIndex(startIndex); Assert.assertEquals(2, startIndex);