kasakrisz commented on code in PR #4943:
URL: https://github.com/apache/hive/pull/4943#discussion_r1495742105


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveRowIsDeletedPropagator.java:
##########
@@ -17,72 +17,128 @@
  */
 package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
 
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+import 
org.apache.hadoop.hive.ql.ddl.view.materialized.alter.rebuild.AlterMaterializedViewRebuildAnalyzer;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttle;
-import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttleImpl;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
 
 /**
- * {@link HiveRelShuttle} to propagate rowIsDeleted column to all 
HiveRelNodes' rowType in the plan.
- * General rule: we expect that the rowIsDeleted column is the last column in 
the input rowType of the current
+ * {@link ReflectiveVisitor} to propagate row is deleted or inserted columns 
to all HiveRelNodes' rowType in the plan.
+ * General rule: we expect that these columns are the last columns in the 
input rowType of the current
  * {@link 
org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode}.
+ *
+ * This class is part of incremental rebuild of materialized view plan 
generation.
+ * <br>
+ * @see AlterMaterializedViewRebuildAnalyzer
+ * @see HiveAggregateInsertDeleteIncrementalRewritingRule
  */
-public class HiveRowIsDeletedPropagator extends HiveRelShuttleImpl {
+public class HiveRowIsDeletedPropagator implements ReflectiveVisitor {
 
-  protected final RelBuilder relBuilder;
+  private final RelBuilder relBuilder;
+  private final ReflectUtil.MethodDispatcher<RelNode> dispatcher;
 
   public HiveRowIsDeletedPropagator(RelBuilder relBuilder) {
     this.relBuilder = relBuilder;
+    this.dispatcher = ReflectUtil.createMethodDispatcher(
+        RelNode.class, this, "visit", RelNode.class, Context.class);
   }
 
   public RelNode propagate(RelNode relNode) {
-    return relNode.accept(this);
-  }
-
-  /**
-   * Create a Projection on top of TS that contains all columns from TS.
-   * Let rowIsDeleted the last column in the new Project.
-   * Enable fetching Deleted rows in TS.
-   * @param scan - TS to transform
-   * @return - new TS and a optionally a Project on top of it.
-   */
-  @Override
-  public RelNode visit(HiveTableScan scan) {
-    RelDataType tableRowType = scan.getTable().getRowType();
-    RelDataTypeField column = tableRowType.getField(
-        VirtualColumn.ROWISDELETED.getName(), false, false);
-    if (column == null) {
-      // This should not happen since Virtual columns are propagated for all 
native table scans in
-      // CalcitePlanner.genTableLogicalPlan()
-      throw new ColumnPropagationException("TableScan " + scan + " row schema 
does not contain " +
-          VirtualColumn.ROWISDELETED.getName() + " virtual column");
+    return dispatcher.invoke(relNode, new Context());
+  }
+
+  private RelNode visitChild(RelNode parent, int i, RelNode child, Context 
context) {
+    RelNode newRel = dispatcher.invoke(child, context);
+    final List<RelNode> newInputs = new ArrayList<>(parent.getInputs());
+    newInputs.set(i, newRel);
+    return parent.copy(parent.getTraitSet(), newInputs);
+  }
+
+  private RelNode visitChildren(RelNode rel, Context context) {
+    for (Ord<RelNode> input : Ord.zip(rel.getInputs())) {
+      rel = visitChild(rel, input.i, input.e, context);
     }
+    return rel;
+  }
+
+  public static class Context {
+    private final Map<Integer, RexNode> rowIdPredicates = new HashMap<>();
+  }
+
+  public RelNode visit(RelNode relNode, Context context) {
+    return visitChildren(relNode, context);
+  }
+
+  // Add a project on top of the TS.
+  // Project two boolean columns: one for indicating the row is deleted another
+  // for newly inserted.
+  // A row is considered to be
+  //  - deleted when the ROW_IS_DELETED virtual column is true and the writeId 
of the record is higher than the
+  //    saved in materialized view snapshot metadata
+  //  - newly inserted when the ROW_IS_DELETED virtual column is false and the 
writeId of the record is higher than the
+  //    saved in materialized view snapshot metadata
+  public RelNode visit(HiveTableScan scan, Context context) {
+    RelDataType tableRowType = scan.getTable().getRowType();
+    RelDataTypeField rowIdField = getVirtualColumnField(tableRowType, 
VirtualColumn.ROWID, scan);
+    RexNode rowIdPredicate = 
context.rowIdPredicates.get(rowIdField.getIndex());
+
+    RelDataTypeField rowIsDeletedField = getVirtualColumnField(tableRowType, 
VirtualColumn.ROWISDELETED, scan);
 
     RexBuilder rexBuilder = relBuilder.getRexBuilder();
 
     List<RexNode> projects = new ArrayList<>(tableRowType.getFieldCount());
     List<String> projectNames = new ArrayList<>(tableRowType.getFieldCount());
     populateProjects(rexBuilder, tableRowType, projects, projectNames);
     // Propagated column is already in the TS move it to the end
-    RexNode propagatedColumn = projects.remove(column.getIndex());
-    projects.add(propagatedColumn);
-    String propagatedColumnName = projectNames.remove(column.getIndex());
-    projectNames.add(propagatedColumnName);
+    RexNode rowIsDeleted = projects.remove(rowIsDeletedField.getIndex());
+    projects.add(rowIsDeleted);
+    // predicates on rowId introduced by HiveAugmentMaterializationRule into 
the original MV definition query plan
+    // on top of each TS operators.
+    // Later that plan is transformed to a Union rewrite plan where all rowId 
predicates are pulled up on top of
+    // the top Join operator.
+    if (rowIdPredicate == null) {
+      // If a table have not changed then no predicate is introduced for the 
TS. All rows in the table should remain.
+      projects.add(rexBuilder.makeLiteral(false));
+      projects.add(rexBuilder.makeLiteral(false));
+    } else {
+      // A row is deleted if ROW_IS_DELETED is true and rowId > <saved_rowId>
+      projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND, rowIsDeleted, 
rowIdPredicate));
+      // A row is newly inserted if ROW_IS_DELETED is false and rowId > 
<saved_rowId>
+      projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND,
+          rexBuilder.makeCall(SqlStdOperatorTable.NOT, rowIsDeleted), 
rowIdPredicate));
+    }
+    String rowIsDeletedName = 
projectNames.remove(rowIsDeletedField.getIndex());
+    projectNames.add(rowIsDeletedName);
+    projectNames.add("_deleted");

Review Comment:
   Moved.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveRowIsDeletedPropagator.java:
##########
@@ -92,86 +148,146 @@ public RelNode visit(HiveTableScan scan) {
         .build();
   }
 
-  /**
-   * Create a new Project with original projected columns plus add 
rowIsDeleted as last column referencing
-   * the last column of the input {@link RelNode}.
-   * @param project - {@link HiveProject to transform}
-   * @return new Project
-   */
-  @Override
-  public RelNode visit(HiveProject project) {
-    RelNode newProject = visitChild(project, 0, project.getInput());
+  // Add the new columns(_deleted, _inserted) to the original project
+  public RelNode visit(HiveProject project, Context context) {
+    RelNode newProject = visitChild(project, 0, project.getInput(), context);
     RelNode projectInput = newProject.getInput(0);
-    int rowIsDeletedIndex = projectInput.getRowType().getFieldCount() - 1;
-    List<RexNode> newProjects = new 
ArrayList<>(project.getRowType().getFieldCount() + 1);
-    newProjects.addAll(project.getProjects());
 
-    RexNode rowIsDeleted = relBuilder.getRexBuilder().makeInputRef(
-            
projectInput.getRowType().getFieldList().get(rowIsDeletedIndex).getType(), 
rowIsDeletedIndex);
-    newProjects.add(rowIsDeleted);
+    List<RexNode> newProjects = new ArrayList<>(project.getProjects().size() + 
2);
+    newProjects.addAll(project.getProjects());
+    newProjects.add(createInputRef(projectInput, 2));
+    newProjects.add(createInputRef(projectInput, 1));
 
     return relBuilder
-            .push(projectInput)
-            .project(newProjects)
-            .build();
-  }
-
-  /**
-   * Create new Join and a Project on top of it.
-   * @param join - {@link HiveJoin} to transform
-   * @return - new Join with a Project on top
-   */
-  @Override
-  public RelNode visit(HiveJoin join) {
-    // Propagate rowISDeleted to left input
-    RelNode tmpJoin = visitChild(join, 0, join.getInput(0));
-    RelNode leftInput = tmpJoin.getInput(0);
-    RelDataType leftRowType = tmpJoin.getInput(0).getRowType();
-    int leftRowIsDeletedIndex = leftRowType.getFieldCount() - 1;
-    // Propagate rowISDeleted to right input
-    tmpJoin = visitChild(join, 1, join.getInput(1));
-    RelNode rightInput = tmpJoin.getInput(1);
-    RelDataType rightRowType = rightInput.getRowType();
-    int rightRowIsDeletedIndex = rightRowType.getFieldCount() - 1;
-
-    // Create input ref to rowIsDeleted columns in left and right inputs
-    RexBuilder rexBuilder = relBuilder.getRexBuilder();
-    RexNode leftRowIsDeleted = rexBuilder.makeInputRef(
-            leftRowType.getFieldList().get(leftRowIsDeletedIndex).getType(), 
leftRowIsDeletedIndex);
-    RexNode rightRowIsDeleted = rexBuilder.makeInputRef(
-            rightRowType.getFieldList().get(rightRowIsDeletedIndex).getType(),
-            leftRowType.getFieldCount() + rightRowIsDeletedIndex);
-
-    RexNode newJoinCondition;
-    int newLeftFieldCount;
-    if 
(join.getInput(0).getRowType().getField(VirtualColumn.ROWISDELETED.getName(), 
false, false) == null) {
-      // Shift column references refers columns coming from right input by one 
in join condition since the new left input
-      // has a new column
-      newJoinCondition = new InputRefShifter(leftRowType.getFieldCount() - 1, 
relBuilder)
-          .apply(join.getCondition());
-
-      newLeftFieldCount = leftRowType.getFieldCount() - 1;
-    } else {
-      newJoinCondition = join.getCondition();
-      newLeftFieldCount = leftRowType.getFieldCount();
+        .push(projectInput)
+        .project(newProjects)
+        .build();
+  }
+
+  // Union rewrite algorithm pulls up all the predicates on rowId on top of 
top Join operator:
+  // Example:
+  //   HiveUnion(all=[true])
+  //    ...
+  //    HiveFilter(condition=[OR(<(1, $14.writeid), <(1, $6.writeid))])
+  //      HiveJoin(condition=[=($0, $8)], joinType=[inner], algorithm=[none], 
cost=[not available])
+  // Check the filter condition and collect operands of OR expressions 
referencing only one column
+  public RelNode visit(HiveFilter filter, Context context) {
+    RexNode condition = filter.getCondition();
+
+    // The condition might be a single predicate on the rowId (if only one 
table changed)
+    RexInputRef rexInputRef = findPossibleRowIdRef(filter.getCondition());
+    if (rexInputRef != null) {
+      context.rowIdPredicates.put(rexInputRef.getIndex(), 
filter.getCondition());
+      return visitChild(filter, 0, filter.getInput(0), context);
+    }
+
+    if (!condition.isA(SqlKind.OR)) {
+      return visitChild(filter, 0, filter.getInput(0), context);
     }
 
+    for (RexNode operand : ((RexCall)condition).operands) {
+      RexInputRef inputRef = findPossibleRowIdRef(operand);
+      if (inputRef != null) {
+        context.rowIdPredicates.put(inputRef.getIndex(), operand);
+      }
+    }
+
+    return visitChild(filter, 0, filter.getInput(0), context);
+  }
+
+  private RexInputRef findPossibleRowIdRef(RexNode operand) {
+    Set<RexInputRef> inputRefs = findRexInputRefs(operand);
+    if (inputRefs.size() != 1) {
+      return null;
+    }
+
+    // This is a candidate for predicate on rowId
+    return inputRefs.iterator().next();
+  }
+
+  // Propagate new column to each side of the join.
+  // Create a project to combine the propagated expressions.
+  // Create a filter to remove rows which are joined from a deleted and a 
newly inserted row.
+  public RelNode visit(HiveJoin join, Context context) {
+    // Propagate columns to left input
+    RelNode tmpJoin = visitChild(join, 0, join.getInput(0), context);
+    RelNode newLeftInput = tmpJoin.getInput(0);
+    RelDataType newLeftRowType = newLeftInput.getRowType();
+    // Propagate columns to right input.
+    // All column references should be shifted in candidate predicates to the 
left
+    Context rightContext = new Context();
+    int originalLeftFieldCount = join.getInput(0).getRowType().getFieldCount();
+    for (Map.Entry<Integer, RexNode> entry : 
context.rowIdPredicates.entrySet()) {
+      if (entry.getKey() > originalLeftFieldCount) {
+        rightContext.rowIdPredicates.put(entry.getKey() - 
originalLeftFieldCount,
+          new InputRefShifter(originalLeftFieldCount, -originalLeftFieldCount, 
relBuilder).apply(entry.getValue()));
+      }
+    }
+    tmpJoin = visitChild(join, 1, join.getInput(1), rightContext);
+    RelNode newRightInput = tmpJoin.getInput(1);
+    RelDataType newRightRowType = newRightInput.getRowType();
+
+    // Create input refs to propagated columns in left and right inputs
+    int rightAnyDeletedIndex = newRightRowType.getFieldCount() - 2;
+    int rightAnyInsertedIndex = newRightRowType.getFieldCount() - 1;
+    RexBuilder rexBuilder = relBuilder.getRexBuilder();
+    RexNode leftDeleted = createInputRef(newLeftInput, 2);
+    RexNode leftInserted = createInputRef(newLeftInput, 1);
+    RexNode rightDeleted = rexBuilder.makeInputRef(
+        newRightRowType.getFieldList().get(rightAnyDeletedIndex).getType(),
+        newLeftRowType.getFieldCount() + rightAnyDeletedIndex);
+    RexNode rightInserted = rexBuilder.makeInputRef(
+        newRightRowType.getFieldList().get(rightAnyInsertedIndex).getType(),
+        newLeftRowType.getFieldCount() + rightAnyInsertedIndex);
+
+    // Shift column references refers columns coming from right input in join 
condition since the new left input
+    // has a new columns
+    int newLeftFieldCount = newLeftRowType.getFieldCount() - 2;
+    RexNode newJoinCondition = new InputRefShifter(newLeftFieldCount, 2, 
relBuilder).apply(join.getCondition());
+
     // Collect projected columns: all columns from both inputs
-    List<RexNode> projects = new ArrayList<>(newLeftFieldCount + 
rightRowType.getFieldCount() + 1);
-    List<String> projectNames = new ArrayList<>(newLeftFieldCount + 
rightRowType.getFieldCount() + 1);
-    populateProjects(rexBuilder, leftRowType, 0, newLeftFieldCount, projects, 
projectNames);
-    populateProjects(rexBuilder, rightRowType, leftRowType.getFieldCount(), 
rightRowType.getFieldCount(), projects, projectNames);
+    List<RexNode> projects = new ArrayList<>(newLeftFieldCount + 
newRightRowType.getFieldCount() + 1);
+    List<String> projectNames = new ArrayList<>(newLeftFieldCount + 
newRightRowType.getFieldCount() + 1);
+    populateProjects(rexBuilder, newLeftRowType, 0, newLeftFieldCount, 
projects, projectNames);
+    populateProjects(rexBuilder, newRightRowType, 
newLeftRowType.getFieldCount(),
+        newRightRowType.getFieldCount() - 2, projects, projectNames);
+
+    // Create derived expressions
+    projects.add(rexBuilder.makeCall(SqlStdOperatorTable.OR, leftDeleted, 
rightDeleted));
+    projects.add(rexBuilder.makeCall(SqlStdOperatorTable.OR, leftInserted, 
rightInserted));
+    projectNames.add("_any_deleted");

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to