deniskuzZ commented on code in PR #4943: URL: https://github.com/apache/hive/pull/4943#discussion_r1495639814
########## ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveRowIsDeletedPropagator.java: ########## @@ -17,72 +17,128 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views; +import org.apache.calcite.linq4j.Ord; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ReflectUtil; +import org.apache.calcite.util.ReflectiveVisitor; +import org.apache.hadoop.hive.ql.ddl.view.materialized.alter.rebuild.AlterMaterializedViewRebuildAnalyzer; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttle; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelShuttleImpl; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Arrays.asList; /** - * {@link HiveRelShuttle} to propagate rowIsDeleted column to all HiveRelNodes' rowType in the plan. - * General rule: we expect that the rowIsDeleted column is the last column in the input rowType of the current + * {@link ReflectiveVisitor} to propagate row is deleted or inserted columns to all HiveRelNodes' rowType in the plan. + * General rule: we expect that these columns are the last columns in the input rowType of the current * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode}. + * + * This class is part of incremental rebuild of materialized view plan generation. + * <br> + * @see AlterMaterializedViewRebuildAnalyzer + * @see HiveAggregateInsertDeleteIncrementalRewritingRule */ -public class HiveRowIsDeletedPropagator extends HiveRelShuttleImpl { +public class HiveRowIsDeletedPropagator implements ReflectiveVisitor { - protected final RelBuilder relBuilder; + private final RelBuilder relBuilder; + private final ReflectUtil.MethodDispatcher<RelNode> dispatcher; public HiveRowIsDeletedPropagator(RelBuilder relBuilder) { this.relBuilder = relBuilder; + this.dispatcher = ReflectUtil.createMethodDispatcher( + RelNode.class, this, "visit", RelNode.class, Context.class); } public RelNode propagate(RelNode relNode) { - return relNode.accept(this); - } - - /** - * Create a Projection on top of TS that contains all columns from TS. - * Let rowIsDeleted the last column in the new Project. - * Enable fetching Deleted rows in TS. - * @param scan - TS to transform - * @return - new TS and a optionally a Project on top of it. - */ - @Override - public RelNode visit(HiveTableScan scan) { - RelDataType tableRowType = scan.getTable().getRowType(); - RelDataTypeField column = tableRowType.getField( - VirtualColumn.ROWISDELETED.getName(), false, false); - if (column == null) { - // This should not happen since Virtual columns are propagated for all native table scans in - // CalcitePlanner.genTableLogicalPlan() - throw new ColumnPropagationException("TableScan " + scan + " row schema does not contain " + - VirtualColumn.ROWISDELETED.getName() + " virtual column"); + return dispatcher.invoke(relNode, new Context()); + } + + private RelNode visitChild(RelNode parent, int i, RelNode child, Context context) { + RelNode newRel = dispatcher.invoke(child, context); + final List<RelNode> newInputs = new ArrayList<>(parent.getInputs()); + newInputs.set(i, newRel); + return parent.copy(parent.getTraitSet(), newInputs); + } + + private RelNode visitChildren(RelNode rel, Context context) { + for (Ord<RelNode> input : Ord.zip(rel.getInputs())) { + rel = visitChild(rel, input.i, input.e, context); } + return rel; + } + + public static class Context { + private final Map<Integer, RexNode> rowIdPredicates = new HashMap<>(); + } + + public RelNode visit(RelNode relNode, Context context) { + return visitChildren(relNode, context); + } + + // Add a project on top of the TS. + // Project two boolean columns: one for indicating the row is deleted another + // for newly inserted. + // A row is considered to be + // - deleted when the ROW_IS_DELETED virtual column is true and the writeId of the record is higher than the + // saved in materialized view snapshot metadata + // - newly inserted when the ROW_IS_DELETED virtual column is false and the writeId of the record is higher than the + // saved in materialized view snapshot metadata + public RelNode visit(HiveTableScan scan, Context context) { + RelDataType tableRowType = scan.getTable().getRowType(); + RelDataTypeField rowIdField = getVirtualColumnField(tableRowType, VirtualColumn.ROWID, scan); + RexNode rowIdPredicate = context.rowIdPredicates.get(rowIdField.getIndex()); + + RelDataTypeField rowIsDeletedField = getVirtualColumnField(tableRowType, VirtualColumn.ROWISDELETED, scan); RexBuilder rexBuilder = relBuilder.getRexBuilder(); List<RexNode> projects = new ArrayList<>(tableRowType.getFieldCount()); List<String> projectNames = new ArrayList<>(tableRowType.getFieldCount()); populateProjects(rexBuilder, tableRowType, projects, projectNames); // Propagated column is already in the TS move it to the end - RexNode propagatedColumn = projects.remove(column.getIndex()); - projects.add(propagatedColumn); - String propagatedColumnName = projectNames.remove(column.getIndex()); - projectNames.add(propagatedColumnName); + RexNode rowIsDeleted = projects.remove(rowIsDeletedField.getIndex()); + projects.add(rowIsDeleted); + // predicates on rowId introduced by HiveAugmentMaterializationRule into the original MV definition query plan + // on top of each TS operators. + // Later that plan is transformed to a Union rewrite plan where all rowId predicates are pulled up on top of + // the top Join operator. + if (rowIdPredicate == null) { + // If a table have not changed then no predicate is introduced for the TS. All rows in the table should remain. + projects.add(rexBuilder.makeLiteral(false)); + projects.add(rexBuilder.makeLiteral(false)); + } else { + // A row is deleted if ROW_IS_DELETED is true and rowId > <saved_rowId> + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND, rowIsDeleted, rowIdPredicate)); + // A row is newly inserted if ROW_IS_DELETED is false and rowId > <saved_rowId> + projects.add(rexBuilder.makeCall(SqlStdOperatorTable.AND, + rexBuilder.makeCall(SqlStdOperatorTable.NOT, rowIsDeleted), rowIdPredicate)); + } + String rowIsDeletedName = projectNames.remove(rowIsDeletedField.getIndex()); + projectNames.add(rowIsDeletedName); + projectNames.add("_deleted"); Review Comment: should we move that to constants? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org