http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 9a3f81c..22b052c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -27,11 +27,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.Stack; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; @@ -74,6 +75,8 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.Statistics.State; +import org.apache.hadoop.hive.ql.plan.mapper.RuntimeStatsSource; +import org.apache.hadoop.hive.ql.stats.OperatorStats; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -132,7 +135,9 @@ public class StatsRulesProcFactory { try { // gather statistics for the first time and the attach it to table scan operator Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop); - tsop.setStatistics(stats.clone()); + + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) tsop); + tsop.setStatistics(stats); if (LOG.isDebugEnabled()) { LOG.debug("[0] STATS-" + tsop.toString() + " (" + table.getTableName() + "): " + @@ -144,6 +149,7 @@ public class StatsRulesProcFactory { } return null; } + } /** @@ -181,14 +187,15 @@ public class StatsRulesProcFactory { if (satisfyPrecondition(parentStats)) { // this will take care of mapping between input column names and output column names. The // returned column stats will have the output column names. - List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats, - sop.getColumnExprMap(), sop.getSchema()); + List<ColStatistics> colStats = + StatsUtils.getColStatisticsFromExprMap(conf, parentStats, sop.getColumnExprMap(), sop.getSchema()); stats.setColumnStats(colStats); // in case of select(*) the data size does not change if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) { long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); stats.setDataSize(dataSize); } + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) sop); sop.setStatistics(stats); if (LOG.isDebugEnabled()) { @@ -196,7 +203,8 @@ public class StatsRulesProcFactory { } } else { if (parentStats != null) { - sop.setStatistics(parentStats.clone()); + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) sop); + sop.setStatistics(stats); if (LOG.isDebugEnabled()) { LOG.debug("[1] STATS-" + sop.toString() + ": " + parentStats.extendedToString()); @@ -299,7 +307,10 @@ public class StatsRulesProcFactory { LOG.debug("[1] STATS-" + fop.toString() + ": " + st.extendedToString()); } } + + st = applyRuntimeStats(aspCtx.getParseContext().getContext(), st, (Operator<?>) fop); fop.setStatistics(st); + aspCtx.setAndExprStats(null); } return null; @@ -1249,6 +1260,7 @@ public class StatsRulesProcFactory { } } + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) gop); gop.setStatistics(stats); if (LOG.isDebugEnabled() && stats != null) { @@ -1576,6 +1588,7 @@ public class StatsRulesProcFactory { } } + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, jop); jop.setStatistics(stats); if (LOG.isDebugEnabled()) { @@ -1665,6 +1678,7 @@ public class StatsRulesProcFactory { } } + wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, jop); jop.setStatistics(wcStats); if (LOG.isDebugEnabled()) { @@ -2215,6 +2229,7 @@ public class StatsRulesProcFactory { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; LimitOperator lop = (LimitOperator) nd; Operator<? extends OperatorDesc> parent = lop.getParentOperators().get(0); Statistics parentStats = parent.getStatistics(); @@ -2232,6 +2247,7 @@ public class StatsRulesProcFactory { if (limit <= parentStats.getNumRows()) { updateStats(stats, limit, true, lop); } + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, (Operator<?>) lop); lop.setStatistics(stats); if (LOG.isDebugEnabled()) { @@ -2243,7 +2259,8 @@ public class StatsRulesProcFactory { // in the absence of column statistics, compute data size based on // based on average row size limit = StatsUtils.getMaxIfOverflow(limit); - Statistics wcStats = parentStats.scaleToRowCount(limit); + Statistics wcStats = parentStats.scaleToRowCount(limit, true); + wcStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), wcStats, (Operator<?>) lop); lop.setStatistics(wcStats); if (LOG.isDebugEnabled()) { LOG.debug("[1] STATS-" + lop.toString() + ": " + wcStats.extendedToString()); @@ -2265,8 +2282,7 @@ public class StatsRulesProcFactory { public static class ReduceSinkStatsRule extends DefaultStatsRule implements NodeProcessor { @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { ReduceSinkOperator rop = (ReduceSinkOperator) nd; Operator<? extends OperatorDesc> parent = rop.getParentOperators().get(0); Statistics parentStats = parent.getStatistics(); @@ -2283,8 +2299,7 @@ public class StatsRulesProcFactory { String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key; ExprNodeDesc end = colExprMap.get(prefixedKey); if (end != null) { - ColStatistics cs = StatsUtils - .getColStatisticsFromExpression(conf, parentStats, end); + ColStatistics cs = StatsUtils.getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { cs.setColumnName(prefixedKey); colStats.add(cs); @@ -2296,8 +2311,7 @@ public class StatsRulesProcFactory { String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val; ExprNodeDesc end = colExprMap.get(prefixedVal); if (end != null) { - ColStatistics cs = StatsUtils - .getColStatisticsFromExpression(conf, parentStats, end); + ColStatistics cs = StatsUtils.getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { cs.setColumnName(prefixedVal); colStats.add(cs); @@ -2307,6 +2321,8 @@ public class StatsRulesProcFactory { outStats.setColumnStats(colStats); } + + outStats = applyRuntimeStats(aspCtx.getParseContext().getContext(), outStats, (Operator<?>) rop); rop.setStatistics(outStats); if (LOG.isDebugEnabled()) { LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString()); @@ -2355,6 +2371,7 @@ public class StatsRulesProcFactory { LOG.debug("[0] STATS-" + op.toString() + ": " + stats.extendedToString()); } } + stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, op); op.getConf().setStatistics(stats); } } @@ -2473,4 +2490,24 @@ public class StatsRulesProcFactory { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); } + + + private static Statistics applyRuntimeStats(Context context, Statistics stats, Operator<?> op) { + if (!context.getRuntimeStatsSource().isPresent()) { + return stats; + } + RuntimeStatsSource rss = context.getRuntimeStatsSource().get(); + + Optional<OperatorStats> os = rss.lookup(op); + + if (!os.isPresent()) { + return stats; + } + LOG.debug("using runtime stats for {}; {}", op, os.get()); + Statistics outStats = stats.clone(); + outStats = outStats.scaleToRowCount(os.get().getOutputRecords(), false); + outStats.setRuntimeStats(true); + return outStats; + } + }
http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 78cbf25..a1ec96c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -303,6 +303,7 @@ KW_COMPACTIONS: 'COMPACTIONS'; KW_TRANSACTIONS: 'TRANSACTIONS'; KW_REWRITE : 'REWRITE'; KW_AUTHORIZATION: 'AUTHORIZATION'; +KW_REOPTIMIZATION: 'REOPTIMIZATION'; KW_CONF: 'CONF'; KW_VALUES: 'VALUES'; KW_RELOAD: 'RELOAD'; http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 0c6aece..3abc752 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -774,14 +774,21 @@ explainStatement : KW_EXPLAIN ( explainOption* execStatement -> ^(TOK_EXPLAIN execStatement explainOption*) | - KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE queryStatementExpression)) + KW_REWRITE queryStatementExpression -> ^(TOK_EXPLAIN_SQ_REWRITE queryStatementExpression) + ) ; explainOption @init { msgs.push("explain option"); } @after { msgs.pop(); } - : KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION|KW_ANALYZE| - (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?) + : KW_EXTENDED + | KW_FORMATTED + | KW_DEPENDENCY + | KW_LOGICAL + | KW_AUTHORIZATION + | KW_ANALYZE + | KW_REOPTIMIZATION + | (KW_VECTORIZATION vectorizationOnly? vectorizatonDetail?) ; vectorizationOnly http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 35f9edf..2bba33f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -832,6 +832,7 @@ nonReserved | KW_ZONE | KW_TIMESTAMPTZ | KW_DEFAULT + | KW_REOPTIMIZATION | KW_RESOURCE | KW_PLAN | KW_PLANS | KW_QUERY_PARALLELISM | KW_ACTIVATE | KW_MOVE | KW_DO | KW_POOL | KW_ALLOC_FRACTION | KW_SCHEDULING_POLICY | KW_PATH | KW_MAPPING | KW_WORKLOAD | KW_MANAGEMENT | KW_ACTIVE | KW_UNMANAGED http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index 714cf39..e04a783 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -22,12 +22,9 @@ package org.apache.hadoop.hive.ql.plan; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.PTFUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -public class AbstractOperatorDesc implements OperatorDesc { +public abstract class AbstractOperatorDesc implements OperatorDesc { protected boolean vectorMode = false; @@ -125,10 +122,12 @@ public class AbstractOperatorDesc implements OperatorDesc { this.memAvailable = memoryAvailble; } + @Override public String getRuntimeStatsTmpDir() { return runtimeStatsTmpDir; } + @Override public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir) { this.runtimeStatsTmpDir = runtimeStatsTmpDir; } @@ -161,4 +160,9 @@ public class AbstractOperatorDesc implements OperatorDesc { this.colExprMap = colExprMap; } + @Override + public void fillSignature(Map<String, Object> ret) { + throw new RuntimeException(); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java index 7d5be6b..a68371a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java @@ -19,14 +19,15 @@ package org.apache.hadoop.hive.ql.plan; import java.io.IOException; -import java.util.List; import java.util.Objects; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.io.DataOutputBuffer; + @SuppressWarnings("serial") @Explain(displayName = "Application Master Event Operator") public class AppMasterEventDesc extends AbstractOperatorDesc { @@ -36,11 +37,13 @@ public class AppMasterEventDesc extends AbstractOperatorDesc { private String inputName; @Explain(displayName = "Target Vertex") + @Signature public String getVertexName() { return vertexName; } @Explain(displayName = "Target Input") + @Signature public String getInputName() { return inputName; } @@ -53,6 +56,7 @@ public class AppMasterEventDesc extends AbstractOperatorDesc { this.vertexName = vertexName; } + @Signature public TableDesc getTable() { return table; } @@ -98,4 +102,5 @@ public class AppMasterEventDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java index 7332693..5a81add 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; + +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -38,6 +40,7 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { this.mapJoinConversionPos = mapJoinConversionPos; } + @Signature public int getNumBuckets() { return numBuckets; } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java index 5d3fdb8..32c6c6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -67,6 +68,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc { return targetColumnName + " (" + targetColumnType + ")"; } + @Signature public String getTargetColumnName() { return targetColumnName; } @@ -75,6 +77,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc { this.targetColumnName = columnName; } + @Signature public String getTargetColumnType() { return targetColumnType; } @@ -94,6 +97,7 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc { } @Explain(displayName = "Partition key expr") + @Signature public String getPartKeyString() { return this.partKey.getExprString(); } @@ -112,4 +116,5 @@ public class DynamicPruningEventDesc extends AppMasterEventDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index ce61fc5..e15a49f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -25,6 +25,7 @@ import java.util.Objects; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -191,6 +192,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) + @Signature public Path getDirName() { return dirName; } @@ -214,6 +216,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Signature public TableDesc getTableInfo() { return tableInfo; } @@ -223,6 +226,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } @Explain(displayName = "compressed") + @Signature public boolean getCompressed() { return compressed; } @@ -232,6 +236,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } @Explain(displayName = "GlobalTableId", explainLevels = { Level.EXTENDED }) + @Signature + public int getDestTableId() { return destTableId; } @@ -260,6 +266,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe * @return the multiFileSpray */ @Explain(displayName = "MultiFileSpray", explainLevels = { Level.EXTENDED }) + @Signature + public boolean isMultiFileSpray() { return multiFileSpray; } @@ -311,6 +319,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe * @return the totalFiles */ @Explain(displayName = "TotalFiles", explainLevels = { Level.EXTENDED }) + @Signature + public int getTotalFiles() { return totalFiles; } @@ -340,6 +350,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe * @return the numFiles */ @Explain(displayName = "NumFilesPerFileSink", explainLevels = { Level.EXTENDED }) + @Signature + public int getNumFiles() { return numFiles; } @@ -364,6 +376,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } @Explain(displayName = "Static Partition Specification", explainLevels = { Level.EXTENDED }) + @Signature public String getStaticSpec() { return staticSpec; } @@ -374,6 +387,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe @Override @Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED }) + @Signature + public boolean isGatherStats() { return gatherStats; } @@ -391,6 +406,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe */ @Override @Explain(displayName = "Stats Publishing Key Prefix", explainLevels = { Level.EXTENDED }) + // FIXME: including this in the signature will almost certenly differ even if the operator is doing the same + // there might be conflicting usages of logicalCompare? + @Signature public String getStatsAggPrefix() { // dirName uniquely identifies destination directory of a FileSinkOperator. // If more than one FileSinkOperator write to the same partition, this dirName http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java index d59834c..fc7327a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -109,6 +110,7 @@ public class FilterDesc extends AbstractOperatorDesc { this.sampleDescr = sampleDescr; } + @Signature public String getPredicateString() { return PlanUtils.getExprListString(Arrays.asList(predicate)); } @@ -137,6 +139,7 @@ public class FilterDesc extends AbstractOperatorDesc { } @Explain(displayName = "isSamplingPred", explainLevels = { Level.EXTENDED }) + @Signature public boolean getIsSamplingPred() { return isSamplingPred; } @@ -154,6 +157,7 @@ public class FilterDesc extends AbstractOperatorDesc { } @Explain(displayName = "sampleDesc", explainLevels = { Level.EXTENDED }) + @Signature public String getSampleDescExpr() { return sampleDescr == null ? null : sampleDescr.toString(); } @@ -234,4 +238,5 @@ public class FilterDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 86cc77d..31237c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -25,16 +25,12 @@ import java.util.Objects; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hive.common.util.AnnotationUtils; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; /** @@ -129,6 +125,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "mode") + @Signature public String getModeString() { switch (mode) { case COMPLETE: @@ -155,6 +152,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "keys") + @Signature public String getKeyString() { return PlanUtils.getExprListString(keys); } @@ -173,6 +171,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "outputColumnNames") + @Signature public ArrayList<java.lang.String> getOutputColumnNames() { return outputColumnNames; } @@ -183,6 +182,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "pruneGroupingSetId", displayOnlyOnTrue = true) + @Signature public boolean pruneGroupingSetId() { return groupingSetPosition >= 0 && outputColumnNames.size() != keys.size() + aggregators.size(); @@ -210,6 +210,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "aggregations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Signature public List<String> getAggregatorStrings() { List<String> res = new ArrayList<String>(); for (AggregationDesc agg: aggregators) { @@ -235,6 +236,7 @@ public class GroupByDesc extends AbstractOperatorDesc { } @Explain(displayName = "bucketGroup", displayOnlyOnTrue = true) + @Signature public boolean getBucketGroup() { return bucketGroup; } @@ -424,4 +426,5 @@ public class GroupByDesc extends AbstractOperatorDesc { return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java index 9c651ab..a61a47e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -289,6 +290,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable { @Override @Explain(displayName = "filter mappings", explainLevels = { Level.EXTENDED }) + @Signature public Map<Integer, String> getFilterMapString() { return toCompactString(filterMap); } @@ -304,6 +306,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable { /** * @return the keys in string form */ + @Override @Explain(displayName = "keys") public Map<Byte, String> getKeysString() { Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>(); @@ -313,6 +316,7 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable { return keyMap; } + @Override @Explain(displayName = "keys", explainLevels = { Level.USER }) public Map<Byte, String> getUserLevelExplainKeysString() { Map<Byte, String> keyMap = new LinkedHashMap<Byte, String>(); @@ -399,4 +403,5 @@ public class HashTableSinkDesc extends JoinDesc implements Serializable { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java index 6dcf05a..ea22131 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; /** * Join conditions Descriptor implementation. - * + * */ public class JoinCondDesc implements Serializable { private static final long serialVersionUID = 1L; @@ -153,7 +153,7 @@ public class JoinCondDesc implements Serializable { @Explain(explainLevels = { Level.USER }) public String getUserLevelJoinCondString() { - JSONObject join = new JSONObject(new LinkedHashMap()); + JSONObject join = new JSONObject(new LinkedHashMap<>()); try { switch (type) { case JoinDesc.INNER_JOIN: @@ -200,4 +200,6 @@ public class JoinCondDesc implements Serializable { } return true; } + + // XXX: is hashCode missing here? } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index bd45c75..5b7f4c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -30,6 +30,7 @@ import java.util.Objects; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -229,6 +230,7 @@ public class JoinDesc extends AbstractOperatorDesc { * @return the keys in string form */ @Explain(displayName = "keys") + @Signature public Map<Byte, String> getKeysString() { if (joinKeys == null) { return null; @@ -266,6 +268,7 @@ public class JoinDesc extends AbstractOperatorDesc { * @return Map from alias to filters on the alias. */ @Explain(displayName = "filter predicates") + @Signature public Map<Byte, String> getFiltersStringMap() { if (getFilters() == null || getFilters().size() == 0) { return null; @@ -342,6 +345,7 @@ public class JoinDesc extends AbstractOperatorDesc { } @Explain(displayName = "outputColumnNames") + @Signature public List<String> getOutputColumnNames() { return outputColumnNames; } @@ -365,6 +369,7 @@ public class JoinDesc extends AbstractOperatorDesc { } @Explain(displayName = "condition map", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Signature public List<JoinCondDesc> getCondsList() { if (conds == null) { return null; @@ -425,6 +430,7 @@ public class JoinDesc extends AbstractOperatorDesc { } @Explain(displayName = "handleSkewJoin", displayOnlyOnTrue = true) + @Signature public boolean getHandleSkewJoin() { return handleSkewJoin; } @@ -524,6 +530,7 @@ public class JoinDesc extends AbstractOperatorDesc { } @Explain(displayName = "nullSafes") + @Signature public String getNullSafeString() { if (nullsafes == null) { return null; @@ -740,4 +747,5 @@ public class JoinDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java index 3837a49..85a4683 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LateralViewJoinDesc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; import java.util.Objects; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -49,6 +50,7 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc { } @Explain(displayName = "outputColumnNames") + @Signature public ArrayList<String> getOutputInternalColNames() { return outputInternalColNames; } @@ -74,4 +76,5 @@ public class LateralViewJoinDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java index ce53fea..698af94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -58,6 +59,7 @@ public class LimitDesc extends AbstractOperatorDesc { this.offset = offset; } + @Signature @Explain(displayName = "Number of rows", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public int getLimit() { return limit; @@ -100,4 +102,5 @@ public class LimitDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index cf4ab60..91ea159 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; @@ -139,6 +140,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { } @Explain(displayName = "input vertices", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Signature public Map<Integer, String> getParentToInput() { return parentToInput; } @@ -156,6 +158,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { } @Explain(displayName = "Estimated key counts", explainLevels = { Level.EXTENDED }) + @Signature public String getKeyCountsExplainDesc() { StringBuilder result = null; for (Map.Entry<Integer, Long> entry : parentKeyCounts.entrySet()) { @@ -250,6 +253,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { * @return the position of the big table not in memory */ @Explain(displayName = "Position of Big Table", explainLevels = { Level.EXTENDED }) + @Signature public int getPosBigTable() { return posBigTable; } @@ -340,6 +344,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { } @Explain(displayName = "BucketMapJoin", explainLevels = { Level.USER, Level.EXTENDED }, displayOnlyOnTrue = true) + @Signature public boolean isBucketMapJoin() { return isBucketMapJoin; } @@ -607,4 +612,5 @@ public class MapJoinDesc extends JoinDesc implements Serializable { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index 870b4d9..e8a5827 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -34,7 +34,10 @@ public interface OperatorDesc extends Serializable, Cloneable { public void setMaxMemoryAvailable(long memoryAvailble); public String getRuntimeStatsTmpDir(); public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir); + boolean isSame(OperatorDesc other); public Map<String, ExprNodeDesc> getColumnExprMap(); public void setColumnExprMap(Map<String, ExprNodeDesc> colExprMap); + + void fillSignature(Map<String, Object> ret); } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index bf24ff8..f2955af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc.ReduceSinkKeyType; @@ -97,7 +98,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded //flag used to control how TopN handled for PTF/Windowing partitions. - private boolean isPTFReduceSink = false; + private boolean isPTFReduceSink = false; private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable private boolean forwarding; // Whether this RS can forward records directly instead of shuffling/sorting @@ -206,6 +207,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return PlanUtils.getExprListString(keyCols); } + @Signature public java.util.ArrayList<ExprNodeDesc> getKeyCols() { return keyCols; } @@ -227,6 +229,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return PlanUtils.getExprListString(valueCols); } + @Signature public java.util.ArrayList<ExprNodeDesc> getValueCols() { return valueCols; } @@ -245,6 +248,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return PlanUtils.getExprListString(partitionCols, true); } + @Signature public java.util.ArrayList<ExprNodeDesc> getPartitionCols() { return partitionCols; } @@ -261,6 +265,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return false; } + @Signature @Explain(displayName = "tag", explainLevels = { Level.EXTENDED }) public int getTag() { return tag; @@ -270,6 +275,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { this.tag = tag; } + @Signature public int getTopN() { return topN; } @@ -349,6 +355,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { * of the same length as key columns, that consists of only "+" * (ascending order) and "-" (descending order). */ + @Signature @Explain(displayName = "sort order") public String getOrder() { return keySerializeInfo.getProperties().getProperty( @@ -437,6 +444,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return forwarding; } + @Signature @Explain(displayName = "auto parallelism", explainLevels = { Level.EXTENDED }) public final boolean isAutoParallel() { return (this.reduceTraits.contains(ReducerTraits.AUTOPARALLEL)); @@ -462,7 +470,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { // reducers or hash function. boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET); - + if (this.reduceTraits.contains(ReducerTraits.FIXED)) { return; } else if (traits.contains(ReducerTraits.FIXED)) { @@ -661,4 +669,5 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java index 858de98..53fca99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ScriptDesc.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hive.ql.plan; +import java.util.Objects; + import org.apache.hadoop.hive.ql.exec.RecordReader; import org.apache.hadoop.hive.ql.exec.RecordWriter; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import java.util.Objects; - /** * ScriptDesc. @@ -63,6 +64,7 @@ public class ScriptDesc extends AbstractOperatorDesc { this.scriptErrInfo = scriptErrInfo; } + @Signature @Explain(displayName = "command", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getScriptCmd() { return scriptCmd; @@ -72,6 +74,7 @@ public class ScriptDesc extends AbstractOperatorDesc { this.scriptCmd = scriptCmd; } + @Signature @Explain(displayName = "output info", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public TableDesc getScriptOutputInfo() { return scriptOutputInfo; @@ -154,4 +157,5 @@ public class ScriptDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java index e38e7e4..51b94fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SelectDesc.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -72,6 +73,7 @@ public class SelectDesc extends AbstractOperatorDesc { return ret; } + @Signature @Explain(displayName = "expressions") public String getColListString() { return PlanUtils.getExprListString(colList); @@ -86,6 +88,7 @@ public class SelectDesc extends AbstractOperatorDesc { this.colList = colList; } + @Signature @Explain(displayName = "outputColumnNames") public List<java.lang.String> getOutputColumnNames() { return outputColumnNames; @@ -101,6 +104,7 @@ public class SelectDesc extends AbstractOperatorDesc { this.outputColumnNames = outputColumnNames; } + @Signature @Explain(displayName = "SELECT * ") public String explainNoCompute() { if (isSelStarNoCompute()) { @@ -184,4 +188,5 @@ public class SelectDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java index 0057f0c..fd461ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java @@ -49,6 +49,7 @@ public class Statistics implements Serializable { private State basicStatsState; private Map<String, ColStatistics> columnStats; private State columnStatsState; + private boolean runtimeStats; public Statistics() { this(0, 0); @@ -119,6 +120,9 @@ public class Statistics implements Serializable { @Explain(displayName = "Statistics") public String toString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append("(RUNTIME) "); + } sb.append("Num rows: "); sb.append(numRows); if (runTimeNumRows >= 0) { @@ -136,6 +140,9 @@ public class Statistics implements Serializable { @Explain(displayName = "Statistics", explainLevels = { Level.USER }) public String toUserLevelExplainString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append("runtime: "); + } sb.append("rows="); sb.append(numRows); if (runTimeNumRows >= 0) { @@ -153,6 +160,9 @@ public class Statistics implements Serializable { public String extendedToString() { StringBuilder sb = new StringBuilder(); + if (runtimeStats) { + sb.append(" (runtime) "); + } sb.append(" numRows: "); sb.append(numRows); sb.append(" dataSize: "); @@ -179,6 +189,8 @@ public class Statistics implements Serializable { } clone.setColumnStats(cloneColStats); } + // TODO: this boolean flag is set only by RS stats annotation at this point + //clone.setRuntimeStats(runtimeStats); return clone; } @@ -300,10 +312,13 @@ public class Statistics implements Serializable { this.runTimeNumRows = runTimeNumRows; } - public Statistics scaleToRowCount(long newRowCount) { + public Statistics scaleToRowCount(long newRowCount, boolean downScaleOnly) { Statistics ret; ret = clone(); - if(numRows == 0 || newRowCount >= numRows) { + if (numRows == 0) { + return ret; + } + if (downScaleOnly && newRowCount >= numRows) { return ret; } // FIXME: using real scaling by new/old ration might yield better results? @@ -311,4 +326,12 @@ public class Statistics implements Serializable { ret.dataSize = StatsUtils.safeMult(getAvgRowSize(), newRowCount); return ret; } + + public boolean isRuntimeStats() { + return runtimeStats; + } + + public void setRuntimeStats(final boolean runtimeStats) { + this.runtimeStats = runtimeStats; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 59968fa..57df7e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.parse.TableSample; import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -156,10 +157,20 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD } @Explain(displayName = "alias") + // FIXME: this might not needed to be in the signature; but in that case the compare shouldn't consider it either! + @Signature public String getAlias() { return alias; } + @Signature + public String getPredicateString() { + if (filterExpr == null) { + return null; + } + return PlanUtils.getExprListString(Arrays.asList(filterExpr)); + } + @Explain(displayName = "table", jsonOnly = true) public String getTableName() { return this.tableName; @@ -219,6 +230,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD return PlanUtils.getExprListString(Arrays.asList(filterExpr)); } + // @Signature // XXX public ExprNodeGenericFuncDesc getFilterExpr() { return filterExpr; } @@ -296,6 +308,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD @Override @Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED }) + @Signature public boolean isGatherStats() { return gatherStats; } @@ -347,6 +360,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD this.rowLimit = rowLimit; } + @Signature public int getRowLimit() { return rowLimit; } @@ -372,6 +386,11 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD return isMetadataOnly; } + // @Signature + public String getQualifiedTable() { + return tableMetadata.getFullyQualifiedName(); + } + public Table getTableMetadata() { return tableMetadata; } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java index cf8e6e5..adcf707 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/UDTFDesc.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; -import org.apache.hadoop.hive.ql.plan.Explain.Level; import java.util.Objects; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + /** * All member variables should have a setters and getters of the form get<member @@ -54,6 +56,7 @@ public class UDTFDesc extends AbstractOperatorDesc { } @Explain(displayName = "function name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Signature public String getUDTFName() { return genericUDTF.toString(); } @@ -67,6 +70,7 @@ public class UDTFDesc extends AbstractOperatorDesc { } @Explain(displayName = "outer lateral view") + @Signature public String isOuterLateralView() { return outerLV ? "true" : null; } @@ -80,4 +84,5 @@ public class UDTFDesc extends AbstractOperatorDesc { } return false; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java new file mode 100644 index 0000000..57762ed --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +public class EmptyStatsSource implements StatsSource { + + @Override + public boolean canProvideStatsFor(Class<?> class1) { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java new file mode 100644 index 0000000..7b9e99e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/GroupTransformer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup; + +public interface GroupTransformer { + void map(LinkGroup group); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java new file mode 100644 index 0000000..36d7e58 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapper.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Enables to connect related objects to eachother. + * + * Most importantly it aids to connect Operators to OperatorStats and probably RelNodes. + */ +public class PlanMapper { + + Set<LinkGroup> groups = new HashSet<>(); + private Map<Object, LinkGroup> objectMap = new HashMap<>(); + + public class LinkGroup { + Set<Object> members = new HashSet<>(); + + public void add(Object o) { + members.add(o); + objectMap.put(o, this); + } + + @SuppressWarnings("unchecked") + public <T> List<T> getAll(Class<T> clazz) { + List<T> ret = new ArrayList<>(); + for (Object m : members) { + if (clazz.isInstance(m)) { + ret.add((T) m); + } + } + return ret; + } + } + + public void link(Object o1, Object o2) { + LinkGroup g1 = objectMap.get(o1); + LinkGroup g2 = objectMap.get(o2); + if (g1 != null && g2 != null && g1 != g2) { + throw new RuntimeException("equivalence mapping violation"); + } + LinkGroup targetGroup = (g1 != null) ? g1 : (g2 != null ? g2 : new LinkGroup()); + groups.add(targetGroup); + targetGroup.add(o1); + targetGroup.add(o2); + } + + public <T> List<T> getAll(Class<T> clazz) { + List<T> ret = new ArrayList<>(); + for (LinkGroup g : groups) { + ret.addAll(g.getAll(clazz)); + } + return ret; + } + + public void runMapper(GroupTransformer mapper) { + for (LinkGroup equivGroup : groups) { + mapper.map(equivGroup); + } + } + + public <T> List<T> lookupAll(Class<T> clazz, Object key) { + LinkGroup group = objectMap.get(key); + if (group == null) { + throw new NoSuchElementException(Objects.toString(key)); + } + return group.getAll(clazz); + } + + public <T> T lookup(Class<T> clazz, Object key) { + List<T> all = lookupAll(clazz, key); + if (all.size() != 1) { + // FIXME: use a different exception type? + throw new IllegalArgumentException("Expected match count is 1; but got:" + all); + } + return all.get(0); + } + + @VisibleForTesting + public Iterator<LinkGroup> iterateGroups() { + return groups.iterator(); + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java new file mode 100644 index 0000000..424dd79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/PlanMapperProcess.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignatureFactory; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.LinkGroup; + +public class PlanMapperProcess { + + private static class OpTreeSignatureMapper implements GroupTransformer { + + private OpTreeSignatureFactory cache = OpTreeSignatureFactory.newCache(); + + @Override + public void map(LinkGroup group) { + List<Operator> operators= group.getAll(Operator.class); + for (Operator op : operators) { + group.add(OpTreeSignature.of(op,cache)); + } + } + } + + public static void runPostProcess(PlanMapper planMapper) { + planMapper.runMapper(new OpTreeSignatureMapper()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java new file mode 100644 index 0000000..21a0678 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/RuntimeStatsSource.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.Optional; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.stats.OperatorStats; + +public interface RuntimeStatsSource extends StatsSource { + public Optional<OperatorStats> lookup(Operator<?> tsop); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java new file mode 100644 index 0000000..6f340b8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.stats.OperatorStats; + +public class SimpleRuntimeStatsSource implements RuntimeStatsSource { + + private final PlanMapper pm; + + + public SimpleRuntimeStatsSource(PlanMapper pm) { + PlanMapperProcess.runPostProcess(pm); + this.pm = pm; + } + + @Override + public Optional<OperatorStats> lookup(Operator<?> op) { + try { + OpTreeSignature sig = OpTreeSignature.of(op); + List<OperatorStats> v = pm.lookupAll(OperatorStats.class, sig); + if (v.size() > 0) { + return Optional.of(v.get(0)); + } + return Optional.empty(); + } catch (NoSuchElementException | IllegalArgumentException iae) { + return Optional.empty(); + } + } + + @Override + public boolean canProvideStatsFor(Class<?> class1) { + if (Operator.class.isAssignableFrom(class1)) { + return true; + } + if (HiveFilter.class.isAssignableFrom(class1)) { + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java new file mode 100644 index 0000000..a4cb6e9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +public interface StatsSource { + + boolean canProvideStatsFor(Class<?> class1); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java new file mode 100644 index 0000000..2b0d23c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; + +/** + * Defines an interface for re-execution logics. + * + * FIXME: rethink methods. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface IReExecutionPlugin { + + /** + * Called when the {@link Driver} is being initialized + * + * The plugin may add hooks/etc to tap into the system. + */ + void initialize(Driver driver); + + /** + * Called before executing the query. + */ + void beforeExecute(int executionIndex, boolean explainReOptimization); + + /** + * The query have failed, does this plugin advises to re-execute it again? + */ + boolean shouldReExecute(int executionNum); + + /** + * The plugin should prepare for the re-compilaton of the query. + */ + void prepareToReExecute(); + + /** + * The query have failed; and have been recompiled - does this plugin advises to re-execute it again? + */ + boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper); + + + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java new file mode 100644 index 0000000..9303171 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.QueryDisplay; +import org.apache.hadoop.hive.ql.QueryInfo; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; +import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Enables to use re-execution logics. + * + * Covers the IDriver interface, handles query re-execution; and asks clear questions from the underlying re-execution plugins. + */ +public class ReExecDriver implements IDriver { + + private class HandleReOptimizationExplain implements HiveSemanticAnalyzerHook { + + @Override + public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) throws SemanticException { + if (ast.getType() == HiveParser.TOK_EXPLAIN) { + int childCount = ast.getChildCount(); + for (int i = 1; i < childCount; i++) { + if (ast.getChild(i).getType() == HiveParser.KW_REOPTIMIZATION) { + explainReOptimization = true; + ast.deleteChild(i); + break; + } + } + if (explainReOptimization && firstExecution()) { + Tree execTree = ast.getChild(0); + execTree.setParent(ast.getParent()); + ast.getParent().setChild(0, execTree); + return (ASTNode) execTree; + } + } + return ast; + } + + @Override + public void postAnalyze(HiveSemanticAnalyzerHookContext context, List<Task<? extends Serializable>> rootTasks) + throws SemanticException { + } + } + + private static final Logger LOG = LoggerFactory.getLogger(ReExecDriver.class); + private boolean explainReOptimization; + protected Driver coreDriver; + private QueryState queryState; + private String currentQuery; + private int executionIndex; + + private ArrayList<IReExecutionPlugin> plugins; + + @Override + public HiveConf getConf() { + return queryState.getConf(); + } + + public boolean firstExecution() { + return executionIndex == 0; + } + + public ReExecDriver(QueryState queryState, String userName, QueryInfo queryInfo, + ArrayList<IReExecutionPlugin> plugins) { + this.queryState = queryState; + coreDriver = new Driver(queryState, userName, queryInfo, null); + coreDriver.getHookRunner().addSemanticAnalyzerHook(new HandleReOptimizationExplain()); + this.plugins = plugins; + + for (IReExecutionPlugin p : plugins) { + p.initialize(coreDriver); + } + } + + @Override + public int compile(String string) { + return coreDriver.compile(string); + } + + @Override + public CommandProcessorResponse compileAndRespond(String statement) { + currentQuery = statement; + return coreDriver.compileAndRespond(statement); + } + + @Override + public QueryPlan getPlan() { + return coreDriver.getPlan(); + } + + @Override + public QueryDisplay getQueryDisplay() { + return coreDriver.getQueryDisplay(); + } + + @Override + public void setOperationId(String guid64) { + coreDriver.setOperationId(guid64); + } + + @Override + public CommandProcessorResponse run() { + executionIndex = 0; + int maxExecutuions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT); + + + while (true) { + executionIndex++; + for (IReExecutionPlugin p : plugins) { + p.beforeExecute(executionIndex, explainReOptimization); + } + coreDriver.getContext().setExecutionIndex(executionIndex); + LOG.info("Execution #{} of query", executionIndex); + CommandProcessorResponse cpr = coreDriver.run(); + + boolean shouldReExecute = explainReOptimization && executionIndex==1; + shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute(); + + if (executionIndex >= maxExecutuions || !shouldReExecute) { + return cpr; + } + LOG.info("Preparing to re-execute query"); + prepareToReExecute(); + PlanMapper oldPlanMapper = coreDriver.getPlanMapper(); + CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery); + if (compile_resp.failed()) { + // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before? + return compile_resp; + } + + PlanMapper newPlanMapper = coreDriver.getPlanMapper(); + if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) { + // FIXME: retain old error; or create a new one? + return cpr; + } + } + } + + private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { + boolean ret = false; + for (IReExecutionPlugin p : plugins) { + ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper); + } + return ret; + } + + private boolean shouldReExecute() { + boolean ret = false; + for (IReExecutionPlugin p : plugins) { + ret |= p.shouldReExecute(executionIndex); + } + return ret; + } + + @Override + public CommandProcessorResponse run(String command) { + CommandProcessorResponse r0 = compileAndRespond(command); + if (r0.getResponseCode() != 0) { + return r0; + } + return run(); + } + + protected void prepareToReExecute() { + for (IReExecutionPlugin p : plugins) { + p.prepareToReExecute(); + } + } + + @Override + public boolean getResults(List res) throws IOException { + return coreDriver.getResults(res); + } + + @Override + public void setMaxRows(int maxRows) { + coreDriver.setMaxRows(maxRows); + } + + @Override + public FetchTask getFetchTask() { + return coreDriver.getFetchTask(); + } + + @Override + public Schema getSchema() { + return coreDriver.getSchema(); + } + + @Override + public boolean isFetchingTable() { + return coreDriver.isFetchingTable(); + } + + @Override + public void resetFetch() throws IOException { + coreDriver.resetFetch(); + } + + @Override + public void close() { + coreDriver.close(); + } + + @Override + public void destroy() { + coreDriver.destroy(); + } + + @Override + public final Context getContext() { + return coreDriver.getContext(); + } + + @VisibleForTesting + public void setRuntimeStatsSource(SimpleRuntimeStatsSource statsSource) { + coreDriver.setRuntimeStatsSource(statsSource); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java new file mode 100644 index 0000000..4ee3c14 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; + +/** + * Re-Executes a query only adding an extra overlay + */ +public class ReExecutionOverlayPlugin implements IReExecutionPlugin { + + private Driver driver; + private Map<String, String> subtree; + + class LocalHook implements ExecuteWithHookContext { + + @Override + public void run(HookContext hookContext) throws Exception { + if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) { + Throwable exception = hookContext.getException(); + if (exception != null) { + if (exception.getMessage().contains("Vertex failed,")) { + retryPossible = true; + } + } + } + } + } + + @Override + public void initialize(Driver driver) { + this.driver = driver; + driver.getHookRunner().addOnFailureHook(new LocalHook()); + HiveConf conf = driver.getConf(); + subtree = conf.subtree("reexec.overlay"); + } + + private boolean retryPossible; + + @Override + public void prepareToReExecute() { + HiveConf conf = driver.getConf(); + conf.verifyAndSetAll(subtree); + } + + @Override + public boolean shouldReExecute(int executionNum) { + return executionNum == 1 && !subtree.isEmpty() && retryPossible; + } + + @Override + public boolean shouldReExecute(int executionNum, PlanMapper pm1, PlanMapper pm2) { + return executionNum == 1; + } + + @Override + public void beforeExecute(int executionIndex, boolean explainReOptimization) { + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java new file mode 100644 index 0000000..7078587 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; +import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource; +import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class ReOptimizePlugin implements IReExecutionPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(ReOptimizePlugin.class); + + private boolean retryPossible; + + private Driver coreDriver; + + private OperatorStatsReaderHook statsReaderHook; + + class LocalHook implements ExecuteWithHookContext { + + @Override + public void run(HookContext hookContext) throws Exception { + if (hookContext.getHookType() == HookType.ON_FAILURE_HOOK) { + Throwable exception = hookContext.getException(); + if (exception != null) { + { + String message = exception.getMessage(); + if (message != null) { + boolean isOOM = message.contains(MapJoinMemoryExhaustionError.class.getName()) + || message.contains(OutOfMemoryError.class.getName()); + if (message.contains("Vertex failed,") && isOOM) { + retryPossible = true; + } + System.out.println(exception); + } + } + } + } + } + } + + @Override + public void initialize(Driver driver) { + coreDriver = driver; + coreDriver.getHookRunner().addOnFailureHook(new LocalHook()); + statsReaderHook = new OperatorStatsReaderHook(); + coreDriver.getHookRunner().addOnFailureHook(statsReaderHook); + coreDriver.getHookRunner().addPostHook(statsReaderHook); + // statsReaderHook.setCollectOnSuccess(true); + statsReaderHook.setCollectOnSuccess( + driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS)); + } + + @Override + public boolean shouldReExecute(int executionNum) { + return retryPossible; + } + + @Override + public void prepareToReExecute() { + statsReaderHook.setCollectOnSuccess(true); + PlanMapper pm = coreDriver.getContext().getPlanMapper(); + coreDriver.setRuntimeStatsSource(new SimpleRuntimeStatsSource(pm)); + retryPossible = false; + } + + @Override + public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { + return planDidChange(oldPlanMapper, newPlanMapper); + } + + private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) { + List<Operator> opsL = getRootOps(pmL); + List<Operator> opsR = getRootOps(pmR); + for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) { + Operator<?> opL = itL.next(); + for (Iterator<Operator> itR = opsR.iterator(); itR.hasNext();) { + Operator<?> opR = itR.next(); + if (opL.logicalEqualsTree(opR)) { + itL.remove(); + itR.remove(); + break; + } + } + } + return opsL.isEmpty() && opsR.isEmpty(); + } + + private List<Operator> getRootOps(PlanMapper pmL) { + List<Operator> ops = pmL.getAll(Operator.class); + for (Iterator<Operator> iterator = ops.iterator(); iterator.hasNext();) { + Operator o = iterator.next(); + if (o.getNumChild() != 0) { + iterator.remove(); + } + } + return ops; + } + + @Override + public void beforeExecute(int executionIndex, boolean explainReOptimization) { + if (explainReOptimization) { + statsReaderHook.setCollectOnSuccess(true); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java new file mode 100644 index 0000000..52e18a8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/OperatorStats.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.stats; + +public class OperatorStats { + private final String operatorId; + private long outputRecords; + + public OperatorStats(final String opId) { + this.operatorId = opId; + this.outputRecords = -1; + } + + public long getOutputRecords() { + return outputRecords; + } + + public void setOutputRecords(final long outputRecords) { + this.outputRecords = outputRecords; + } + + public String getOperatorId() { + return operatorId; + } + + @Override + public String toString() { + return String.format("OperatorStats %s records: %d", operatorId, outputRecords); + } +}