kasakrisz commented on code in PR #4943: URL: https://github.com/apache/hive/pull/4943#discussion_r1495742105
########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveRowIsDeletedPropagator.java: ########## @@ -17,72 +17,128 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; +import org.apache.calcite.linq4j.Ord; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; +import org.apache.hadoop.hive.ql.ddl.view.materialized.alter.rebuild.AlterMaterializedViewRebuildAnalyzer; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttle; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttleImpl; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Arrays.asList; /** - * {@link HiveRelShuttle} to propagate rowIsDeleted column to all HiveRelNodes' rowType in the plan. - * General rule: we expect that the rowIsDeleted column is the last column in the input rowType of the current + * {@link ReflectiveVisitor} to propagate row is deleted or inserted columns to all HiveRelNodes' rowType in the plan. + * General rule: we expect that these columns are the last columns in the input rowType of the current * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode}. + * + * This class is part of incremental rebuild of materialized view plan generation. + * <br> + * @see AlterMaterializedViewRebuildAnalyzer + * @see HiveAggregateInsertDeleteIncrementalRewritingRule */ -public class HiveRowIsDeletedPropagator extends HiveRelShuttleImpl { +public class HiveRowIsDeletedPropagator implements ReflectiveVisitor { - protected final RelBuilder relBuilder; + private final RelBuilder relBuilder; + private final ReflectUtil.MethodDispatcher<RelNode> dispatcher; public HiveRowIsDeletedPropagator(RelBuilder relBuilder) { this.relBuilder = relBuilder; + this.dispatcher = ReflectUtil.createMethodDispatcher( + RelNode.class, this, "visit", RelNode.class, Context.class); } public RelNode propagate(RelNode relNode) { - return relNode.accept(this); - } - - /** - * Create a Projection on top of TS that contains all columns from TS. - * Let rowIsDeleted the last column in the new Project. - * Enable fetching Deleted rows in TS. - * @param scan - TS to transform - * @return - new TS and a optionally a Project on top of it. - */ - @Override - public RelNode visit(HiveTableScan scan) { - RelDataType tableRowType = scan.getTable().getRowType(); - RelDataTypeField column = tableRowType.getField( - VirtualColumn.ROWISDELETED.getName(), false, false); - if (column == null) { - // This should not happen since Virtual columns are propagated for all native table scans in - // CalcitePlanner.genTableLogicalPlan() - throw new ColumnPropagationException("TableScan " + scan + " row schema does not contain " + - VirtualColumn.ROWISDELETED.getName() + " virtual column"); + return dispatcher.invoke(relNode, new Context()); + } + + private RelNode visitChild(RelNode parent, int i, RelNode child, Context context) { + RelNode newRel = dispatcher.invoke(child, context); + final List<RelNode> newInputs = new ArrayList<>(parent.getInputs()); + newInputs.set(i, newRel); + return parent.copy(parent.getTraitSet(), newInputs); + } + + private RelNode visitChildren(RelNode rel, Context context) { + for (Ord<RelNode> input : Ord.zip(rel.getInputs())) { + rel = visitChild(rel, input.i, input.e, context); } + return rel; + } + + public static class Context { + private final Map<Integer, RexNode> rowIdPredicates = new HashMap<>(); + } + + public RelNode visit(RelNode relNode, Context context) { + return visitChildren(relNode, context); + } + + // Add a project on top of the TS. + // Project two boolean columns: one for indicating the row is deleted another + // for newly inserted. + // A row is considered to be + // - deleted when the ROW_IS_DELETED virtual column is true and the writeId of the record is higher than the + // saved in materialized view snapshot metadata + // - newly inserted when the ROW_IS_DELETED virtual column is false and the writeId of the record is higher than the + // saved in materialized view snapshot metadata + public RelNode visit(HiveTableScan scan, Context context) { + RelDataType tableRowType = scan.getTable().getRowType(); + RelDataTypeField rowIdField = getVirtualColumnField(tableRowType, VirtualColumn.ROWID, scan); + RexNode rowIdPredicate = context.rowIdPredicates.get(rowIdField.getIndex()); + + RelDataTypeField rowIsDeletedField = getVirtualColumnField(tableRowType, VirtualColumn.ROWISDELETED, scan); RexBuilder rexBuilder = relBuilder.getRexBuilder(); List<RexNode> projects = new ArrayList<>(tableRowType.getFieldCount()); List<String> projectNames = new ArrayList<>(tableRowType.getFieldCount()); populateProjects(rexBuilder, tableRowType, projects, projectNames); // Propagated column is already in the TS move it to the end - RexNode propagatedColumn = projects.remove(column.getIndex()); - projects.add(propagatedColumn); - String propagatedColumnName = projectNames.remove(column.getIndex()); - projectNames.add(propagatedColumnName); + RexNode rowIsDeleted = projects.remove(rowIsDeletedField.getIndex()); + projects.add(rowIsDeleted); + // predicates on rowId introduced by HiveAugmentMaterializationRule into the original MV definition query plan + // on top of each TS operators. + // Later that plan is transformed to a Union rewrite plan where all rowId predicates are pulled up on top of + // the top Join operator. + if (rowIdPredicate == null) { + // If a table have not changed then no predicate is introduced for the TS. All rows in the table should remain. + projects.add(rexBuilder.makeLiteral(false)); + projects.add(rexBuilder.makeLiteral(false)); + } else { + // A row is deleted if ROW_IS_DELETED is true and rowId > <saved_rowId> + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND, rowIsDeleted, rowIdPredicate)); + // A row is newly inserted if ROW_IS_DELETED is false and rowId > <saved_rowId> + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND, + rexBuilder.makeCall(SqlStdOperatorTable.NOT, rowIsDeleted), rowIdPredicate)); + } + String rowIsDeletedName = projectNames.remove(rowIsDeletedField.getIndex()); + projectNames.add(rowIsDeletedName); + projectNames.add("_deleted"); Review Comment: Moved. ########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveRowIsDeletedPropagator.java: ########## @@ -92,86 +148,146 @@ public RelNode visit(HiveTableScan scan) { .build(); } - /** - * Create a new Project with original projected columns plus add rowIsDeleted as last column referencing - * the last column of the input {@link RelNode}. - * @param project - {@link HiveProject to transform} - * @return new Project - */ - @Override - public RelNode visit(HiveProject project) { - RelNode newProject = visitChild(project, 0, project.getInput()); + // Add the new columns(_deleted, _inserted) to the original project + public RelNode visit(HiveProject project, Context context) { + RelNode newProject = visitChild(project, 0, project.getInput(), context); RelNode projectInput = newProject.getInput(0); - int rowIsDeletedIndex = projectInput.getRowType().getFieldCount() - 1; - List<RexNode> newProjects = new ArrayList<>(project.getRowType().getFieldCount() + 1); - newProjects.addAll(project.getProjects()); - RexNode rowIsDeleted = relBuilder.getRexBuilder().makeInputRef( - projectInput.getRowType().getFieldList().get(rowIsDeletedIndex).getType(), rowIsDeletedIndex); - newProjects.add(rowIsDeleted); + List<RexNode> newProjects = new ArrayList<>(project.getProjects().size() + 2); + newProjects.addAll(project.getProjects()); + newProjects.add(createInputRef(projectInput, 2)); + newProjects.add(createInputRef(projectInput, 1)); return relBuilder - .push(projectInput) - .project(newProjects) - .build(); - } - - /** - * Create new Join and a Project on top of it. - * @param join - {@link HiveJoin} to transform - * @return - new Join with a Project on top - */ - @Override - public RelNode visit(HiveJoin join) { - // Propagate rowISDeleted to left input - RelNode tmpJoin = visitChild(join, 0, join.getInput(0)); - RelNode leftInput = tmpJoin.getInput(0); - RelDataType leftRowType = tmpJoin.getInput(0).getRowType(); - int leftRowIsDeletedIndex = leftRowType.getFieldCount() - 1; - // Propagate rowISDeleted to right input - tmpJoin = visitChild(join, 1, join.getInput(1)); - RelNode rightInput = tmpJoin.getInput(1); - RelDataType rightRowType = rightInput.getRowType(); - int rightRowIsDeletedIndex = rightRowType.getFieldCount() - 1; - - // Create input ref to rowIsDeleted columns in left and right inputs - RexBuilder rexBuilder = relBuilder.getRexBuilder(); - RexNode leftRowIsDeleted = rexBuilder.makeInputRef( - leftRowType.getFieldList().get(leftRowIsDeletedIndex).getType(), leftRowIsDeletedIndex); - RexNode rightRowIsDeleted = rexBuilder.makeInputRef( - rightRowType.getFieldList().get(rightRowIsDeletedIndex).getType(), - leftRowType.getFieldCount() + rightRowIsDeletedIndex); - - RexNode newJoinCondition; - int newLeftFieldCount; - if (join.getInput(0).getRowType().getField(VirtualColumn.ROWISDELETED.getName(), false, false) == null) { - // Shift column references refers columns coming from right input by one in join condition since the new left input - // has a new column - newJoinCondition = new InputRefShifter(leftRowType.getFieldCount() - 1, relBuilder) - .apply(join.getCondition()); - - newLeftFieldCount = leftRowType.getFieldCount() - 1; - } else { - newJoinCondition = join.getCondition(); - newLeftFieldCount = leftRowType.getFieldCount(); + .push(projectInput) + .project(newProjects) + .build(); + } + + // Union rewrite algorithm pulls up all the predicates on rowId on top of top Join operator: + // Example: + // HiveUnion(all=[true]) + // ... + // HiveFilter(condition=[OR(<(1, $14.writeid), <(1, $6.writeid))]) + // HiveJoin(condition=[=($0, $8)], joinType=[inner], algorithm=[none], cost=[not available]) + // Check the filter condition and collect operands of OR expressions referencing only one column + public RelNode visit(HiveFilter filter, Context context) { + RexNode condition = filter.getCondition(); + + // The condition might be a single predicate on the rowId (if only one table changed) + RexInputRef rexInputRef = findPossibleRowIdRef(filter.getCondition()); + if (rexInputRef != null) { + context.rowIdPredicates.put(rexInputRef.getIndex(), filter.getCondition()); + return visitChild(filter, 0, filter.getInput(0), context); + } + + if (!condition.isA(SqlKind.OR)) { + return visitChild(filter, 0, filter.getInput(0), context); } + for (RexNode operand : ((RexCall)condition).operands) { + RexInputRef inputRef = findPossibleRowIdRef(operand); + if (inputRef != null) { + context.rowIdPredicates.put(inputRef.getIndex(), operand); + } + } + + return visitChild(filter, 0, filter.getInput(0), context); + } + + private RexInputRef findPossibleRowIdRef(RexNode operand) { + Set<RexInputRef> inputRefs = findRexInputRefs(operand); + if (inputRefs.size() != 1) { + return null; + } + + // This is a candidate for predicate on rowId + return inputRefs.iterator().next(); + } + + // Propagate new column to each side of the join. + // Create a project to combine the propagated expressions. + // Create a filter to remove rows which are joined from a deleted and a newly inserted row. + public RelNode visit(HiveJoin join, Context context) { + // Propagate columns to left input + RelNode tmpJoin = visitChild(join, 0, join.getInput(0), context); + RelNode newLeftInput = tmpJoin.getInput(0); + RelDataType newLeftRowType = newLeftInput.getRowType(); + // Propagate columns to right input. + // All column references should be shifted in candidate predicates to the left + Context rightContext = new Context(); + int originalLeftFieldCount = join.getInput(0).getRowType().getFieldCount(); + for (Map.Entry<Integer, RexNode> entry : context.rowIdPredicates.entrySet()) { + if (entry.getKey() > originalLeftFieldCount) { + rightContext.rowIdPredicates.put(entry.getKey() - originalLeftFieldCount, + new InputRefShifter(originalLeftFieldCount, -originalLeftFieldCount, relBuilder).apply(entry.getValue())); + } + } + tmpJoin = visitChild(join, 1, join.getInput(1), rightContext); + RelNode newRightInput = tmpJoin.getInput(1); + RelDataType newRightRowType = newRightInput.getRowType(); + + // Create input refs to propagated columns in left and right inputs + int rightAnyDeletedIndex = newRightRowType.getFieldCount() - 2; + int rightAnyInsertedIndex = newRightRowType.getFieldCount() - 1; + RexBuilder rexBuilder = relBuilder.getRexBuilder(); + RexNode leftDeleted = createInputRef(newLeftInput, 2); + RexNode leftInserted = createInputRef(newLeftInput, 1); + RexNode rightDeleted = rexBuilder.makeInputRef( + newRightRowType.getFieldList().get(rightAnyDeletedIndex).getType(), + newLeftRowType.getFieldCount() + rightAnyDeletedIndex); + RexNode rightInserted = rexBuilder.makeInputRef( + newRightRowType.getFieldList().get(rightAnyInsertedIndex).getType(), + newLeftRowType.getFieldCount() + rightAnyInsertedIndex); + + // Shift column references refers columns coming from right input in join condition since the new left input + // has a new columns + int newLeftFieldCount = newLeftRowType.getFieldCount() - 2; + RexNode newJoinCondition = new InputRefShifter(newLeftFieldCount, 2, relBuilder).apply(join.getCondition()); + // Collect projected columns: all columns from both inputs - List<RexNode> projects = new ArrayList<>(newLeftFieldCount + rightRowType.getFieldCount() + 1); - List<String> projectNames = new ArrayList<>(newLeftFieldCount + rightRowType.getFieldCount() + 1); - populateProjects(rexBuilder, leftRowType, 0, newLeftFieldCount, projects, projectNames); - populateProjects(rexBuilder, rightRowType, leftRowType.getFieldCount(), rightRowType.getFieldCount(), projects, projectNames); + List<RexNode> projects = new ArrayList<>(newLeftFieldCount + newRightRowType.getFieldCount() + 1); + List<String> projectNames = new ArrayList<>(newLeftFieldCount + newRightRowType.getFieldCount() + 1); + populateProjects(rexBuilder, newLeftRowType, 0, newLeftFieldCount, projects, projectNames); + populateProjects(rexBuilder, newRightRowType, newLeftRowType.getFieldCount(), + newRightRowType.getFieldCount() - 2, projects, projectNames); + + // Create derived expressions + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.OR, leftDeleted, rightDeleted)); + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.OR, leftInserted, rightInserted)); + projectNames.add("_any_deleted"); Review Comment: Moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org