[ 
https://issues.apache.org/jira/browse/DRILL-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622420#comment-16622420
 ] 

ASF GitHub Bot commented on DRILL-6381:
---------------------------------------

gparai commented on a change in pull request #1466: DRILL-6381: Add support for 
index based planning and execution
URL: https://github.com/apache/drill/pull/1466#discussion_r219260403
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
 ##########
 @@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.index.generators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.drill.exec.planner.common.JoinControl;
+import org.apache.drill.exec.planner.index.IndexLogicalPlanCallContext;
+import org.apache.drill.exec.planner.index.IndexDescriptor;
+import org.apache.drill.exec.planner.index.FunctionalIndexInfo;
+import org.apache.drill.exec.planner.index.FunctionalIndexHelper;
+import org.apache.drill.exec.planner.index.IndexPlanUtils;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
+import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.Prule;
+import org.apache.drill.exec.planner.physical.RowKeyJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+/**
+ * Generate a non-covering index plan that is equivalent to the original plan. 
The non-covering plan consists
+ * of a join-back between an index lookup and the primary table. This 
join-back is performed using a rowkey join.
+ * For the primary table, we use a restricted scan that allows doing skip-scan 
instead of sequential scan.
+ *
+ * Original Plan:
+ *               Filter
+ *                 |
+ *            DBGroupScan
+ *
+ * New Plan:
+ *
+ *            RowKeyJoin
+ *          /         \
+ * Remainder Filter  Exchange
+ *         |            |
+ *   Restricted    Filter (with index columns only)
+ *   DBGroupScan        |
+ *                  IndexGroupScan
+ *
+ * This plan will be further optimized by the filter pushdown rule of the 
Index plugin which should
+ * push the index column filters into the index scan.
+ */
+public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(NonCoveringIndexPlanGenerator.class);
+  final protected IndexGroupScan indexGroupScan;
+  final private IndexDescriptor indexDesc;
+  // Ideally This functionInfo should be cached along with indexDesc.
+  final protected FunctionalIndexInfo functionInfo;
+
+  public NonCoveringIndexPlanGenerator(IndexLogicalPlanCallContext 
indexContext,
+                                       IndexDescriptor indexDesc,
+                                       IndexGroupScan indexGroupScan,
+                                       RexNode indexCondition,
+                                       RexNode remainderCondition,
+                                       RexBuilder builder,
+                                       PlannerSettings settings) {
+    super(indexContext, indexCondition, remainderCondition, builder, settings);
+    this.indexGroupScan = indexGroupScan;
+    this.indexDesc = indexDesc;
+    this.functionInfo = indexDesc.getFunctionalInfo();
+  }
+
+  @Override
+  public RelNode convertChild(final RelNode topRel, final RelNode input) 
throws InvalidRelException {
+
+    if (indexGroupScan == null) {
+      logger.error("Null indexgroupScan in 
NonCoveringIndexPlanGenerator.convertChild");
+      return null;
+    }
+
+    RelDataType dbscanRowType = convertRowType(origScan.getRowType(), 
origScan.getCluster().getTypeFactory());
+    RelDataType indexScanRowType = 
FunctionalIndexHelper.convertRowTypeForIndexScan(
+        origScan, indexContext.getOrigMarker(), indexGroupScan, functionInfo);
+
+    DrillDistributionTrait partition = 
IndexPlanUtils.scanIsPartition(IndexPlanUtils.getGroupScan(origScan))?
+        DrillDistributionTrait.RANDOM_DISTRIBUTED : 
DrillDistributionTrait.SINGLETON;
+
+    ScanPrel indexScanPrel = new ScanPrel(origScan.getCluster(),
+        origScan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition), 
indexGroupScan, indexScanRowType, origScan.getTable());
+    DbGroupScan origDbGroupScan = 
(DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
+
+    // right (build) side of the rowkey join: do a distribution of 
project-filter-indexscan subplan
+    RexNode convertedIndexCondition = 
FunctionalIndexHelper.convertConditionForIndexScan(indexCondition,
+        origScan, indexScanRowType, builder, functionInfo);
+    FilterPrel  rightIndexFilterPrel = new 
FilterPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
+          indexScanPrel, convertedIndexCondition);
+
+    double finalRowCount = 
indexGroupScan.getRowCount(indexContext.getOrigCondition(), origScan);
+
+    // project the rowkey column from the index scan
+    List<RexNode> rightProjectExprs = Lists.newArrayList();
+    int rightRowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), 
origScan);//indexGroupScan.getRowKeyOrdinal();
+    assert rightRowKeyIndex >= 0;
+
+    rightProjectExprs.add(RexInputRef.of(rightRowKeyIndex, 
indexScanPrel.getRowType()));
+
+    final List<RelDataTypeField> indexScanFields = 
indexScanPrel.getRowType().getFieldList();
+
+    final RelDataTypeFactory.FieldInfoBuilder rightFieldTypeBuilder =
+        indexScanPrel.getCluster().getTypeFactory().builder();
+
+    // build the row type for the right Project
+    final RelDataTypeField rightRowKeyField = 
indexScanFields.get(rightRowKeyIndex);
+    rightFieldTypeBuilder.add(rightRowKeyField);
+    final RelDataType rightProjectRowType = rightFieldTypeBuilder.build();
+
+    final ProjectPrel rightIndexProjectPrel = new 
ProjectPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
+        rightIndexFilterPrel, rightProjectExprs, rightProjectRowType);
+
+    // create a RANGE PARTITION on the right side (this could be removed later 
during ExcessiveExchangeIdentifier phase
+    // if the estimated row count is smaller than slice_target
+    final RelNode rangeDistRight = createRangeDistRight(rightIndexProjectPrel, 
rightRowKeyField, origDbGroupScan);
+
+    // the range partitioning adds an extra column for the partition id but in 
the final plan we already have a
+    // renaming Project for the _id field inserted as part of the 
JoinPrelRenameVisitor. Thus, we are not inserting
+    // a separate Project here.
+    final RelNode convertedRight = rangeDistRight;
+
+    // left (probe) side of the rowkey join
+
+    List<SchemaPath> cols = new 
ArrayList<SchemaPath>(origDbGroupScan.getColumns());
+    if (!checkRowKey(cols)) {
+      cols.add(origDbGroupScan.getRowKeyPath());
+    }
+
+    // Create a restricted groupscan from the primary table's groupscan
+    DbGroupScan restrictedGroupScan  = 
(DbGroupScan)origDbGroupScan.getRestrictedScan(cols);
+    if (restrictedGroupScan == null) {
+      logger.error("Null restricted groupscan in 
NonCoveringIndexPlanGenerator.convertChild");
+      return null;
+    }
+    // Set left side (restricted scan) row count as rows returned from right 
side (index scan)
+    DrillScanRel rightIdxRel = new DrillScanRel(origScan.getCluster(), 
origScan.getTraitSet(),
+        origScan.getTable(), origScan.getRowType(), 
indexContext.getScanColumns());
+    double rightIdxRowCount = indexGroupScan.getRowCount(indexCondition, 
rightIdxRel);
+    restrictedGroupScan.setRowCount(null, rightIdxRowCount, rightIdxRowCount);
+
+    RelTraitSet origScanTraitSet = origScan.getTraitSet();
+    RelTraitSet restrictedScanTraitSet = 
origScanTraitSet.plus(Prel.DRILL_PHYSICAL);
+
+    // Create the collation traits for restricted scan based on the index 
columns under the
+    // conditions that (a) the index actually has collation property (e.g hash 
indexes don't)
+    // and (b) if an explicit sort operation is not enforced
+    RelCollation collation = null;
+    if (indexDesc.getCollation() != null &&
+         !settings.isIndexForceSortNonCovering()) {
+      collation = IndexPlanUtils.buildCollationNonCoveringIndexScan(indexDesc, 
indexScanRowType, dbscanRowType, indexContext);
+      if (restrictedScanTraitSet.contains(RelCollationTraitDef.INSTANCE)) { // 
replace existing trait
+        restrictedScanTraitSet = 
restrictedScanTraitSet.plus(partition).replace(collation);
+      } else {  // add new one
+        restrictedScanTraitSet = 
restrictedScanTraitSet.plus(partition).plus(collation);
+      }
+    }
+
+    ScanPrel dbScan = new ScanPrel(origScan.getCluster(),
+        restrictedScanTraitSet, restrictedGroupScan, dbscanRowType, 
origScan.getTable());
+    RelNode lastLeft = dbScan;
+    // build the row type for the left Project
+    List<RexNode> leftProjectExprs = Lists.newArrayList();
+    int leftRowKeyIndex = getRowKeyIndex(dbScan.getRowType(), origScan);
+    final RelDataTypeField leftRowKeyField = 
dbScan.getRowType().getFieldList().get(leftRowKeyIndex);
+    final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
+        dbScan.getCluster().getTypeFactory().builder();
+
+    //we are applying the same index condition to primary table's restricted 
scan, the reason
+    // for this is, the scans on index table and primary table are not a 
transaction, meaning that _after_ index scan,
+    // primary table might already have data get updated, thus some rows 
picked by index were modified and no more satisfy the
+    // index condition. By applying the same index condition again here, we 
will avoid the possibility to have some
+    //not-wanted records get into downstream operators in such scenarios.
+    //the remainder condition will be applied on top of RowKeyJoin.
+    FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), 
dbScan.getTraitSet(),
 
 Review comment:
   Yes, adding it in the interface would be good. We would have to make a 
similar change in index intersection plan generator as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add capability to do index based planning and execution
> -------------------------------------------------------
>
>                 Key: DRILL-6381
>                 URL: https://issues.apache.org/jira/browse/DRILL-6381
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components: Execution - Relational Operators, Query Planning &amp; 
> Optimization
>            Reporter: Aman Sinha
>            Assignee: Aman Sinha
>            Priority: Major
>             Fix For: 1.15.0
>
>
> If the underlying data source supports indexes (primary and secondary 
> indexes), Drill should leverage those during planning and execution in order 
> to improve query performance.  
> On the planning side, Drill planner should be enhanced to provide an 
> abstraction layer which express the index metadata and statistics.  Further, 
> a cost-based index selection is needed to decide which index(es) are 
> suitable.  
> On the execution side, appropriate operator enhancements would be needed to 
> handle different categories of indexes such as covering, non-covering 
> indexes, taking into consideration the index data may not be co-located with 
> the primary table, i.e a global index.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to