IMPALA-8021: Add estimated cardinality to EXPLAIN output Cardinality is vital to understanding why a plan has the form it does, yet the planner normally emits cardinality information only for the detailed levels. Unfortunately, most query profiles we see are at the standard level without this information (except in the summary table), making it hard to understand what happened.
This patch adds cardinality to the standard EXPLAIN output. It also changes the displayed cardinality value to be in abbreviated "metric" form: 1.23K instead of 1234, etc. Changing the DESCRIBE output has a huge impact on PlannerTest: all the "golden" test files must change. To avoid doing this twice, this patch also includes: IMPALA-7919: Add predicates line in plan output for partition key predicates This is also the time to also include: IMPALA-8022: Add cardinality checks to PlannerTest The comparison code was changed to allow a set of validators, one of which compares cardinality to ensure it is within 5% of the expected value. This should ensure we don't change estimates unintentionally. While many planner tests are concerned with cardinality, many others are not. Testing showed that the cardinality is actually unstable within tests. For such tests, added filters to ignore cardinality. The filter is enabled by default (for backward compatibility) but disabled (to allow cardinality verification) for the critical tests. Rebasing the tests was complicated by a bug in the error-matching code, so this patch also fixes: IMPALA-8023: Fix PlannerTest to handle error lines consistently Now, the error output written to the output "save results" file matches that expected in the "golden" file -- no more handling these specially. Testing: * Added cardinality verification. * Reran all FE tests. * Rebased all PlannerTest .test files. * Adjusted the metadata/test_explain.py test to handle the changed EXPLAIN output. Change-Id: Ie9aa2d715b04cbb279aaffec8c5692686562d986 Reviewed-on: http://gerrit.cloudera.org:8080/12136 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a7ea86b7 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a7ea86b7 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a7ea86b7 Branch: refs/heads/master Commit: a7ea86b768247ff5388174445e7c91736b99c2de Parents: 3a3ab7f Author: paul-rogers <prog...@cloudera.com> Authored: Thu Dec 27 17:55:16 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Sat Jan 12 04:03:26 2019 +0000 ---------------------------------------------------------------------- .../apache/impala/analysis/PartitionSet.java | 2 +- .../org/apache/impala/common/PrintUtils.java | 74 +- .../org/apache/impala/planner/EmptySetNode.java | 2 + .../org/apache/impala/planner/ExchangeNode.java | 11 + .../impala/planner/HdfsPartitionPruner.java | 12 +- .../org/apache/impala/planner/HdfsScanNode.java | 48 +- .../impala/planner/NestedLoopJoinNode.java | 1 + .../org/apache/impala/planner/PlanNode.java | 28 +- .../impala/planner/SingleNodePlanner.java | 11 +- .../apache/impala/planner/CardinalityTest.java | 9 + .../org/apache/impala/planner/PlannerTest.java | 53 +- .../apache/impala/planner/PlannerTestBase.java | 15 +- .../org/apache/impala/testutil/TestUtils.java | 55 +- .../queries/PlannerTest/aggregation.test | 269 +++- .../queries/PlannerTest/analytic-fns.test | 423 +++++- .../PlannerTest/complex-types-file-formats.test | 37 +- .../queries/PlannerTest/conjunct-ordering.test | 17 + .../queries/PlannerTest/constant-folding.test | 38 +- .../PlannerTest/constant-propagation.test | 49 +- .../queries/PlannerTest/constant.test | 2 + .../queries/PlannerTest/data-source-tables.test | 9 +- .../queries/PlannerTest/ddl.test | 48 + .../default-join-distr-mode-broadcast.test | 13 +- .../default-join-distr-mode-shuffle.test | 13 +- .../queries/PlannerTest/disable-codegen.test | 38 +- .../PlannerTest/disable-preaggregations.test | 8 + .../queries/PlannerTest/distinct-estimate.test | 13 + .../queries/PlannerTest/distinct.test | 123 ++ .../queries/PlannerTest/empty.test | 67 +- .../PlannerTest/fk-pk-join-detection.test | 72 +- .../queries/PlannerTest/hbase.test | 76 +- .../queries/PlannerTest/hdfs.test | 226 +++- .../queries/PlannerTest/implicit-joins.test | 84 ++ .../queries/PlannerTest/inline-view-limit.test | 109 ++ .../queries/PlannerTest/inline-view.test | 209 +++ .../queries/PlannerTest/insert-sort-by.test | 72 + .../queries/PlannerTest/insert.test | 129 ++ .../queries/PlannerTest/join-order.test | 285 ++++ .../queries/PlannerTest/joins.test | 441 +++++++ .../queries/PlannerTest/kudu-delete.test | 14 + .../queries/PlannerTest/kudu-selectivity.test | 6 +- .../queries/PlannerTest/kudu-update.test | 24 + .../queries/PlannerTest/kudu-upsert.test | 50 + .../queries/PlannerTest/kudu.test | 82 +- .../queries/PlannerTest/lineage.test | 192 ++- .../queries/PlannerTest/max-row-size.test | 110 +- .../PlannerTest/mem-limit-broadcast-join.test | 3 + .../PlannerTest/min-max-runtime-filters.test | 22 +- .../queries/PlannerTest/mt-dop-validation.test | 40 +- .../PlannerTest/multiple-distinct-limit.test | 28 + .../multiple-distinct-materialization.test | 141 ++ .../multiple-distinct-predicates.test | 60 + .../queries/PlannerTest/multiple-distinct.test | 160 +++ .../queries/PlannerTest/nested-collections.test | 575 +++++++- .../queries/PlannerTest/nested-loop-join.test | 52 + .../queries/PlannerTest/order.test | 298 +++++ .../queries/PlannerTest/outer-joins.test | 173 +++ .../PlannerTest/parquet-filtering-disabled.test | 32 +- .../queries/PlannerTest/parquet-filtering.test | 20 +- .../queries/PlannerTest/parquet-stats-agg.test | 124 +- .../PlannerTest/partition-key-scans.test | 57 + .../queries/PlannerTest/partition-pruning.test | 1 + .../PlannerTest/predicate-propagation.test | 235 +++- .../PlannerTest/resource-requirements.test | 965 +++++++------- .../PlannerTest/runtime-filter-propagation.test | 284 +++- .../runtime-filter-query-options.test | 113 ++ .../PlannerTest/shuffle-by-distinct-exprs.test | 55 + .../queries/PlannerTest/small-query-opt.test | 53 +- .../PlannerTest/sort-expr-materialization.test | 32 +- .../PlannerTest/spillable-buffer-sizing.test | 192 +-- .../queries/PlannerTest/subquery-rewrite.test | 508 +++++++ .../queries/PlannerTest/tablesample.test | 12 +- .../PlannerTest/topn-bytes-limit-small.test | 12 + .../queries/PlannerTest/topn-bytes-limit.test | 6 +- .../queries/PlannerTest/topn.test | 73 ++ .../queries/PlannerTest/tpcds-all.test | 1234 ++++++++++++++++++ .../queries/PlannerTest/tpch-all.test | 658 ++++++++++ .../queries/PlannerTest/tpch-kudu.test | 200 +++ .../queries/PlannerTest/tpch-nested.test | 701 +++++++++- .../queries/PlannerTest/tpch-views.test | 200 +++ .../queries/PlannerTest/union.test | 913 ++++++++++++- .../queries/PlannerTest/values.test | 12 + .../queries/PlannerTest/views.test | 88 ++ .../queries/PlannerTest/with-clause.test | 138 ++ .../queries/QueryTest/corrupt-stats.test | 14 +- .../queries/QueryTest/explain-level1.test | 3 + .../queries/QueryTest/explain-level2.test | 30 +- .../queries/QueryTest/explain-level3.test | 24 +- .../queries/QueryTest/stats-extrapolation.test | 11 +- tests/metadata/test_explain.py | 6 +- 90 files changed, 11078 insertions(+), 1149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java index 53a321f..e9972bb 100644 --- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java +++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java @@ -86,7 +86,7 @@ public class PartitionSet extends PartitionSpecBase { try { HdfsPartitionPruner pruner = new HdfsPartitionPruner(desc); - partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, true); + partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, true).first; } catch (ImpalaException e) { if (e instanceof AnalysisException) throw (AnalysisException) e; throw new AnalysisException("Partition expr evaluation failed in the backend.", e); http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/common/PrintUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java index f4814b5..2636914 100644 --- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java +++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java @@ -24,6 +24,8 @@ import static org.apache.impala.common.ByteUnits.PETABYTE; import static org.apache.impala.common.ByteUnits.TERABYTE; import java.text.DecimalFormat; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; @@ -49,6 +51,73 @@ public class PrintUtils { return bytes + "B"; } + public static final long KILO = 1000; + public static final long MEGA = KILO * 1000; + public static final long GIGA = MEGA * 1000; + public static final long TERA = GIGA * 1000; + + /** + * Print a value using simple metric (power of 1000) units. Units are + * (none), K, M, G or T. Value has two digits past the decimal point. + */ + public static String printMetric(long value) { + double result = value; + if (value >= TERA) return new DecimalFormat(".00T").format(result / TERA); + if (value >= GIGA) return new DecimalFormat(".00G").format(result / GIGA); + if (value >= MEGA) return new DecimalFormat(".00M").format(result / MEGA); + if (value >= KILO) return new DecimalFormat(".00K").format(result / KILO); + return Long.toString(value); + } + + /** + * Pattern to use when searching for a metric-encoded value. + */ + public static final String METRIC_REGEX = "(\\d+(?:.\\d+)?)([TGMK]?)"; + + /** + * Pattern to use when searching for or parsing a metric-encoded value. + */ + public static final Pattern METRIC_PATTERN = + Pattern.compile(METRIC_REGEX, Pattern.CASE_INSENSITIVE); + + /** + * Decode a value metric-encoded using {@link #printMetric(long)}. + * @param value metric-encoded string + * @return approximate numeric value, or -1 if the value is invalid + * (metric encoded strings can never be negative normally) + */ + public static double decodeMetric(String value) { + Matcher m = METRIC_PATTERN.matcher(value); + if (! m.matches()) return -1; + return decodeMetric(m.group(1), m.group(2)); + } + + /** + * Decode a metric-encoded string already parsed into parts. + * @param valueStr numeric part of the value + * @param units units part of the value + * @return approximate numeric value + */ + // Yes, "PrintUtils" is an odd place for a parse function, but + // best to keep the formatter and parser together. + public static double decodeMetric(String valueStr, String units) { + double value = Double.parseDouble(valueStr); + switch (units.toUpperCase()) { + case "": + return value; + case "K": + return value * KILO; + case "M": + return value * MEGA; + case "G": + return value * GIGA; + case "T": + return value * TERA; + default: + return -1; + } + } + /** * Same as printBytes() except 0 decimal points are shown for MB and KB. */ @@ -65,9 +134,8 @@ public class PrintUtils { return bytes + "B"; } - public static String printCardinality(String prefix, long cardinality) { - return prefix + "cardinality=" + - ((cardinality != -1) ? String.valueOf(cardinality) : "unavailable"); + public static String printCardinality(long cardinality) { + return (cardinality != -1) ? printMetric(cardinality) : "unavailable"; } public static String printNumHosts(String prefix, long numHosts) { http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java index 7b3ea33..55830c2 100644 --- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java +++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java @@ -76,4 +76,6 @@ public class EmptySetNode extends PlanNode { msg.node_type = TPlanNodeType.EMPTY_SET_NODE; } + @Override + protected boolean displayCardinality(TExplainLevel detailLevel) { return false; } } http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 356ae6b..a140dc2 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -173,6 +173,17 @@ public class ExchangeNode extends PlanNode { return output.toString(); } + /** + * An Exchange simply moves rows over the network: its row width + * and cardinality are identical to its input. So, for standard + * level, there is no need to repeat these values. Retained in + * higher levels for backward compatibility. + */ + @Override + protected boolean displayCardinality(TExplainLevel detailLevel) { + return detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal(); + } + @Override protected String getDisplayLabelDetail() { // For the non-fragmented explain levels, print the data partition http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java index 9fb204a..e9deb44 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java @@ -44,6 +44,7 @@ import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.PrunablePartition; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.Pair; import org.apache.impala.rewrite.BetweenToCompoundRule; import org.apache.impala.rewrite.ExprRewriter; import org.slf4j.Logger; @@ -97,16 +98,20 @@ public class HdfsPartitionPruner { /** * Return a list of partitions left after applying the conjuncts. Please note - * that conjuncts used for filtering will be removed from the list 'conjuncts'. + * that conjuncts used for filtering will be removed from the list 'conjuncts' and + * returned as the second item in the returned Pair. These expressions can be + * shown in the EXPLAIN output. + * * If 'allowEmpty' is False, empty partitions are not returned. */ - public List<? extends FeFsPartition> prunePartitions( + public Pair<List<? extends FeFsPartition>, List<Expr>> prunePartitions( Analyzer analyzer, List<Expr> conjuncts, boolean allowEmpty) throws ImpalaException { // Start with creating a collection of partition filters for the applicable conjuncts. List<HdfsPartitionFilter> partitionFilters = new ArrayList<>(); // Conjuncts that can be evaluated from the partition key values. List<Expr> simpleFilterConjuncts = new ArrayList<>(); + List<Expr> partitionConjuncts = new ArrayList<>(); // Simple predicates (e.g. binary predicates of the form // <SlotRef> <op> <LiteralExpr>) can be used to derive lists @@ -128,6 +133,7 @@ public class HdfsPartitionPruner { } else { partitionFilters.add(new HdfsPartitionFilter(clonedConjunct, tbl_, analyzer)); } + partitionConjuncts.add(conjunct); it.remove(); } } @@ -168,7 +174,7 @@ public class HdfsPartitionPruner { } })); } - return results; + return new Pair<>(results, partitionConjuncts); } /** http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 4f9a961..cc31ce4 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -49,9 +49,9 @@ import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.ColumnStats; -import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsTable; +import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition.FileBlock; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; @@ -68,8 +68,8 @@ import org.apache.impala.fb.FbFileBlock; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TExpr; -import org.apache.impala.thrift.THdfsFileSplit; import org.apache.impala.thrift.TFileSplitGeneratorSpec; +import org.apache.impala.thrift.THdfsFileSplit; import org.apache.impala.thrift.THdfsScanNode; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TPlanNode; @@ -91,8 +91,6 @@ import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; /** * Scan of a single table. @@ -261,10 +259,13 @@ public class HdfsScanNode extends ScanNode { // parquet::Statistics. private TupleDescriptor minMaxTuple_; - // Slot that is used to record the Parquet metatdata for the count(*) aggregation if + // Slot that is used to record the Parquet metadata for the count(*) aggregation if // this scan node has the count(*) optimization enabled. private SlotDescriptor countStarSlot_ = null; + // Conjuncts used to trim the set of partitions passed to this node. + // Used only to display EXPLAIN information. + private final List<Expr> partitionConjuncts_; /** * Construct a node to scan given data files into tuples described by 'desc', * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and @@ -272,12 +273,14 @@ public class HdfsScanNode extends ScanNode { * class comments above for details. */ public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts, - List<? extends FeFsPartition> partitions, TableRef hdfsTblRef, AggregateInfo aggInfo) { + List<? extends FeFsPartition> partitions, TableRef hdfsTblRef, + AggregateInfo aggInfo, List<Expr> partConjuncts) { super(id, desc, "SCAN HDFS"); Preconditions.checkState(desc.getTable() instanceof FeFsTable); tbl_ = (FeFsTable)desc.getTable(); conjuncts_ = conjuncts; partitions_ = partitions; + partitionConjuncts_ = partConjuncts; sampleParams_ = hdfsTblRef.getSampleParams(); replicaPreference_ = hdfsTblRef.getReplicaPreference(); randomReplica_ = hdfsTblRef.getRandomReplica(); @@ -1224,20 +1227,27 @@ public class HdfsScanNode extends ScanNode { } output.append("]\n"); if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { + if (partitionConjuncts_ != null && !partitionConjuncts_.isEmpty()) { + output.append(detailPrefix) + .append(String.format("partition predicates: %s\n", + getExplainString(partitionConjuncts_, detailLevel))); + } if (tbl_.getNumClusteringCols() == 0) numPartitions_ = 1; - output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix, - numPartitions_, table.getPartitions().size(), totalFiles_, - PrintUtils.printBytes(totalBytes_))); - output.append("\n"); + output.append(detailPrefix) + .append(String.format("partitions=%d/%d files=%d size=%s\n", + numPartitions_, table.getPartitions().size(), totalFiles_, + PrintUtils.printBytes(totalBytes_))); if (!conjuncts_.isEmpty()) { - output.append(String.format("%spredicates: %s\n", detailPrefix, + output.append(detailPrefix) + .append(String.format("predicates: %s\n", getExplainString(conjuncts_, detailLevel))); } if (!collectionConjuncts_.isEmpty()) { for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) { String alias = entry.getKey().getAlias(); - output.append(String.format("%spredicates on %s: %s\n", detailPrefix, alias, + output.append(detailPrefix) + .append(String.format("predicates on %s: %s\n", alias, getExplainString(entry.getValue(), detailLevel))); } } @@ -1255,14 +1265,16 @@ public class HdfsScanNode extends ScanNode { } else if (extrapolatedNumRows_ == -1) { extrapRows = "unavailable"; } - output.append(String.format("%sextrapolated-rows=%s", detailPrefix, extrapRows)); + output.append(detailPrefix) + .append(String.format("extrapolated-rows=%s", extrapRows)); output.append(String.format(" max-scan-range-rows=%s", maxScanRangeNumRows_ == -1 ? "unavailable" : maxScanRangeNumRows_)); output.append("\n"); if (numScanRangesNoDiskIds_ > 0) { - output.append(String.format("%smissing disk ids: " + output.append(detailPrefix) + .append(String.format("missing disk ids: " + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", - detailPrefix, numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_, + numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_, totalFiles_, numScanRangesNoDiskIds_, scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_)); } @@ -1283,10 +1295,12 @@ public class HdfsScanNode extends ScanNode { TupleDescriptor tupleDesc = entry.getKey(); List<Expr> exprs = entry.getValue(); if (tupleDesc == getTupleDesc()) { - output.append(String.format("%sparquet statistics predicates: %s\n", prefix, + output.append(prefix) + .append(String.format("parquet statistics predicates: %s\n", getExplainString(exprs, detailLevel))); } else { - output.append(String.format("%sparquet statistics predicates on %s: %s\n", prefix, + output.append(prefix) + .append(String.format("parquet statistics predicates on %s: %s\n", tupleDesc.getAlias(), getExplainString(exprs, detailLevel))); } } http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java index 1ecd1c5..afa75e7 100644 --- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java @@ -30,6 +30,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; + import com.google.common.base.Objects; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index a329dc2..520cd8f 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -320,6 +320,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // Output cardinality, cost estimates and tuple Ids only when explain plan level // is extended or above. + boolean displayCardinality = displayCardinality(detailLevel); if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { // Print resource profile. expBuilder.append(detailPrefix); @@ -334,9 +335,18 @@ abstract public class PlanNode extends TreeNode<PlanNode> { expBuilder.append(tupleId.asInt() + nullIndicator); if (i + 1 != tupleIds_.size()) expBuilder.append(","); } - expBuilder.append(" row-size=" + PrintUtils.printBytes(Math.round(avgRowSize_))); - expBuilder.append(PrintUtils.printCardinality(" ", cardinality_)); - expBuilder.append("\n"); + expBuilder.append(displayCardinality ? " " : "\n"); + } + // Output cardinality: in standard and above levels. + // In standard, on a line by itself (if wanted). In extended, on + // a line with tuple ids. + if (displayCardinality) { + if (detailLevel == TExplainLevel.STANDARD) expBuilder.append(detailPrefix); + expBuilder.append("row-size=") + .append(PrintUtils.printBytes(Math.round(avgRowSize_))) + .append(" cardinality=") + .append(PrintUtils.printCardinality(cardinality_)) + .append("\n"); } if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { @@ -353,7 +363,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } } - // Print the children. Do not traverse into the children of an Exchange node to // avoid crossing fragment boundaries. if (traverseChildren) { @@ -386,6 +395,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } /** + * Per-node setting whether to include cardinality in the node overview. + * Some nodes omit cardinality because either a) it is not needed + * (Empty set, Exchange), or b) it is printed by the node itself (HDFS scan.) + * @return true if cardinality should be included in the generic + * node details, false if it should be omitted. + */ + protected boolean displayCardinality(TExplainLevel detailLevel) { + return detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal(); + } + + /** * Return the node-specific details. * Subclass should override this function. * Each line should be prefixed by detailPrefix. http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 53b224e..a31bb50 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -1280,7 +1280,9 @@ public class SingleNodePlanner { // Do partition pruning before deciding which slots to materialize because we might // end up removing some predicates. HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc); - List<? extends FeFsPartition> partitions = pruner.prunePartitions(analyzer, conjuncts, false); + Pair<List<? extends FeFsPartition>, List<Expr>> pair = + pruner.prunePartitions(analyzer, conjuncts, false); + List<? extends FeFsPartition> partitions = pair.first; // Mark all slots referenced by the remaining conjuncts as materialized. analyzer.materializeSlots(conjuncts); @@ -1323,9 +1325,9 @@ public class SingleNodePlanner { unionNode.init(analyzer); return unionNode; } else { - ScanNode scanNode = + HdfsScanNode scanNode = new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, partitions, - hdfsTblRef, aggInfo); + hdfsTblRef, aggInfo, pair.second); scanNode.init(analyzer); return scanNode; } @@ -1409,7 +1411,6 @@ public class SingleNodePlanner { ((HBaseScanNode)scanNode).setKeyRanges(keyRanges); scanNode.addConjuncts(conjuncts); scanNode.init(analyzer); - return scanNode; } @@ -1424,7 +1425,7 @@ public class SingleNodePlanner { * - for outer joins: same type of conjuncts as inner joins, but only from the * ON or USING clause * Predicates that are redundant based on equivalence classes are intentionally - * returneded by this function because the removal of redundant predicates and the + * returned by this function because the removal of redundant predicates and the * creation of new predicates for enforcing slot equivalences go hand-in-hand * (see analyzer.createEquivConjuncts()). */ http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java index da59860..3bbb903 100644 --- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java +++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java @@ -93,6 +93,15 @@ public class CardinalityTest extends PlannerTestBase { "SELECT COUNT(*) FROM functional.alltypes GROUP BY bool_col", 2); } + @Test + public void testNullColumnJoinCardinality() throws ImpalaException { + // IMPALA-7565: Make sure there is no division by zero during cardinality calculation + // in a many to many join on null columns (ndv = 0). + String query = "select * from functional.nulltable t1 " + + "inner join [shuffle] functional.nulltable t2 on t1.d = t2.d"; + checkCardinality(query, 1, 1); + } + /** * Joins should multiply out cardinalities. */ http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index c8dbf1f..683f702 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -31,6 +31,7 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.Frontend.PlanCtx; import org.apache.impala.testutil.TestUtils; +import org.apache.impala.testutil.TestUtils.IgnoreValueFilter; import org.apache.impala.thrift.TExecRequest; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TJoinDistributionMode; @@ -75,7 +76,8 @@ public class PlannerTest extends PlannerTestBase { @Test public void testEmpty() { - runPlannerTestFile("empty"); + runPlannerTestFile("empty", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test @@ -163,29 +165,34 @@ public class PlannerTest extends PlannerTestBase { @Test public void testJoins() { - runPlannerTestFile("joins"); + runPlannerTestFile("joins", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test public void testJoinOrder() { - runPlannerTestFile("join-order"); + runPlannerTestFile("join-order", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test public void testOuterJoins() { - runPlannerTestFile("outer-joins"); + runPlannerTestFile("outer-joins", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test public void testImplicitJoins() { - runPlannerTestFile("implicit-joins"); + runPlannerTestFile("implicit-joins", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test public void testFkPkJoinDetection() { // The FK/PK detection result is included in EXTENDED or higher. runPlannerTestFile("fk-pk-join-detection", - ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN)); + ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN, + PlannerTestOption.VALIDATE_CARDINALITY)); } @Test @@ -278,7 +285,8 @@ public class PlannerTest extends PlannerTestBase { public void testTpch() { runPlannerTestFile("tpch-all", "tpch", ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER, - PlannerTestOption.VALIDATE_RESOURCES)); + PlannerTestOption.VALIDATE_RESOURCES, + PlannerTestOption.VALIDATE_CARDINALITY)); } @Test @@ -298,7 +306,8 @@ public class PlannerTest extends PlannerTestBase { public void testTpchNested() { runPlannerTestFile("tpch-nested", "tpch_nested_parquet", ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER, - PlannerTestOption.VALIDATE_RESOURCES)); + PlannerTestOption.VALIDATE_RESOURCES, + PlannerTestOption.VALIDATE_CARDINALITY)); } @Test @@ -539,11 +548,16 @@ public class PlannerTest extends PlannerTestBase { } @Test - public void testDefaultJoinDistributionMode() { + public void testDefaultJoinDistributionBroadcastMode() { TQueryOptions options = defaultQueryOptions(); Preconditions.checkState( options.getDefault_join_distribution_mode() == TJoinDistributionMode.BROADCAST); runPlannerTestFile("default-join-distr-mode-broadcast", options); + } + + @Test + public void testDefaultJoinDistributionShuffleMode() { + TQueryOptions options = defaultQueryOptions(); options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE); runPlannerTestFile("default-join-distr-mode-shuffle", options); } @@ -707,12 +721,21 @@ public class PlannerTest extends PlannerTestBase { 8 * 1024 * 1024); } + /** + * Verify that various expected-result filters work on a + * variety of sample input lines. + */ @Test - public void testNullColumnJoinCardinality() throws ImpalaException { - // IMPALA-7565: Make sure there is no division by zero during cardinality calculation - // in a many to many join on null columns (ndv = 0). - String query = "select * from functional.nulltable t1 " - + "inner join [shuffle] functional.nulltable t2 on t1.d = t2.d"; - checkCardinality(query, 1, 1); + public void testFilters() { + IgnoreValueFilter filter = TestUtils.CARDINALITY_FILTER; + assertEquals(" foo=bar cardinality=", + filter.transform(" foo=bar cardinality=10")); + assertEquals(" foo=bar cardinality=", + filter.transform(" foo=bar cardinality=10.3K")); + assertEquals(" foo=bar cardinality=", + filter.transform(" foo=bar cardinality=unavailable")); + filter = TestUtils.ROW_SIZE_FILTER; + assertEquals(" row-size= cardinality=10.3K", + filter.transform(" row-size=10B cardinality=10.3K")); } } http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 7db5d37..d9e168f 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -74,8 +74,8 @@ import org.apache.impala.util.ExecutorMembershipSnapshot; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduScanToken; import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Before; +import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -357,7 +357,8 @@ public class PlannerTestBase extends FrontendTestBase { private void handleException(String query, String expectedErrorMsg, StringBuilder errorLog, StringBuilder actualOutput, Throwable e) { - actualOutput.append(e.toString() + "\n"); + String actualErrorMsg = e.getClass().getSimpleName() + ": " + e.getMessage(); + actualOutput.append(actualErrorMsg).append("\n"); if (expectedErrorMsg == null) { // Exception is unexpected errorLog.append(String.format("Query:\n%s\nError Stack:\n%s\n", query, @@ -365,7 +366,6 @@ public class PlannerTestBase extends FrontendTestBase { } else { // Compare actual and expected error messages. if (expectedErrorMsg != null && !expectedErrorMsg.isEmpty()) { - String actualErrorMsg = e.getClass().getSimpleName() + ": " + e.getMessage(); if (!actualErrorMsg.toLowerCase().startsWith(expectedErrorMsg.toLowerCase())) { errorLog.append("query:\n" + query + "\nExpected error message: '" + expectedErrorMsg + "'\nActual error message: '" + actualErrorMsg + "'\n"); @@ -535,6 +535,10 @@ public class PlannerTestBase extends FrontendTestBase { if (!testOptions.contains(PlannerTestOption.VALIDATE_RESOURCES)) { resultFilters.addAll(TestUtils.RESOURCE_FILTERS); } + if (!testOptions.contains(PlannerTestOption.VALIDATE_CARDINALITY)) { + resultFilters.add(TestUtils.ROW_SIZE_FILTER); + resultFilters.add(TestUtils.CARDINALITY_FILTER); + } String planDiff = TestUtils.compareOutput( Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters); if (!planDiff.isEmpty()) { @@ -804,6 +808,11 @@ public class PlannerTestBase extends FrontendTestBase { // ignore differences in resource values). Operator- and fragment-level resource // requirements are only included if EXTENDED_EXPLAIN is also enabled. VALIDATE_RESOURCES, + // Verify the row size and cardinality fields in the plan. Default is + // to ignore these values (for backward compatibility.) Turn this option + // on for test that validate cardinality calculations: joins, scan + // cardinality, etc. + VALIDATE_CARDINALITY } protected void runPlannerTestFile(String testFile, TQueryOptions options) { http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/testutil/TestUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java index 5e678d2..26412fb 100644 --- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java +++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java @@ -16,6 +16,7 @@ // under the License. package org.apache.impala.testutil; + import java.io.StringReader; import java.io.StringWriter; import java.text.SimpleDateFormat; @@ -80,8 +81,10 @@ public class TestUtils { public PathFilter(String prefix) { filterKey_ = prefix; } + @Override public boolean matches(String input) { return input.contains(filterKey_); } + @Override public String transform(String input) { String result = input.replaceFirst(filterKey_, ""); result = result.replaceAll(PATH_FILTER, " "); @@ -109,8 +112,10 @@ public class TestUtils { this.valueRegex = valueRegex; } + @Override public boolean matches(String input) { return input.contains(keyPrefix); } + @Override public String transform(String input) { return input.replaceAll(keyPrefix + valueRegex, keyPrefix); } @@ -121,6 +126,15 @@ public class TestUtils { public static final IgnoreValueFilter FILE_SIZE_FILTER = new IgnoreValueFilter("size", BYTE_VALUE_REGEX); + // Ignore the row-size=8B entries + public static final IgnoreValueFilter ROW_SIZE_FILTER = + new IgnoreValueFilter("row-size", "\\S+"); + + // Ignore cardinality=27.30K or cardinality=unavailable + // entries + public static final IgnoreValueFilter CARDINALITY_FILTER = + new IgnoreValueFilter("cardinality", "\\S+"); + // Ignore the exact estimated row count, which depends on the file sizes. static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER = new IgnoreValueFilter("max-scan-range-rows", NUMBER_REGEX); @@ -160,6 +174,7 @@ public class TestUtils { } int mismatch = -1; // line in actual w/ mismatch int maxLen = Math.min(actual.size(), expected.size()); + outer: for (int i = 0; i < maxLen; ++i) { String expectedStr = expected.get(i).trim(); String actualStr = actual.get(i); @@ -192,33 +207,31 @@ public class TestUtils { } // do a whitespace-insensitive comparison - Scanner e = new Scanner(expectedStr); - Scanner a = new Scanner(actualStr); - while (a.hasNext() && e.hasNext()) { - if (containsPrefix) { - if (!a.next().contains(e.next())) { - mismatch = i; - break; - } - } else { - if (!a.next().equals(e.next())) { + try (Scanner e = new Scanner(expectedStr); + Scanner a = new Scanner(actualStr)) { + while (a.hasNext() && e.hasNext()) { + String aToken = a.next(); + String eToken = e.next(); + if (containsPrefix) { + if (!aToken.contains(eToken)) { + mismatch = i; + break outer; + } + } else if (!aToken.equals(eToken)) { mismatch = i; - break; + break outer; } } - } - if (mismatch != -1) { - break; - } - if (ignoreAfter) { - if (e.hasNext() && !a.hasNext()) { + if (ignoreAfter) { + if (e.hasNext() && !a.hasNext()) { + mismatch = i; + break outer; + } + } else if (a.hasNext() != e.hasNext()) { mismatch = i; - break; + break outer; } - } else if (a.hasNext() != e.hasNext()) { - mismatch = i; - break; } } if (mismatch == -1 && actual.size() < expected.size()) { http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test index cf5a78b..18230ab 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test @@ -7,22 +7,27 @@ PLAN-ROOT SINK | 01:AGGREGATE [FINALIZE] | output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) +| row-size=34B cardinality=1 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=1B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 03:AGGREGATE [FINALIZE] | output: count:merge(*), count:merge(tinyint_col), min:merge(tinyint_col), max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col) +| row-size=34B cardinality=1 | 02:EXCHANGE [UNPARTITIONED] | 01:AGGREGATE | output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) +| row-size=34B cardinality=1 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=1B cardinality=11.00K ==== # with grouping select tinyint_col, bigint_col, count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), @@ -35,9 +40,11 @@ PLAN-ROOT SINK 01:AGGREGATE [FINALIZE] | output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) | group by: bigint_col, tinyint_col +| row-size=35B cardinality=9.07K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=9B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -46,15 +53,18 @@ PLAN-ROOT SINK 03:AGGREGATE [FINALIZE] | output: count:merge(*), min:merge(tinyint_col), max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col) | group by: bigint_col, tinyint_col +| row-size=35B cardinality=9.07K | 02:EXCHANGE [HASH(bigint_col,tinyint_col)] | 01:AGGREGATE [STREAMING] | output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) | group by: bigint_col, tinyint_col +| row-size=35B cardinality=9.07K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=9B cardinality=11.00K ==== # avg substitution select avg(id) @@ -66,30 +76,37 @@ PLAN-ROOT SINK | 02:TOP-N [LIMIT=10] | order by: avg(zip) ASC +| row-size=16B cardinality=0 | 01:AGGREGATE [FINALIZE] | output: avg(id), count(id), avg(zip) | having: count(id) > 0 +| row-size=24B cardinality=0 | 00:SCAN HDFS [functional.testtbl] partitions=1/1 files=0 size=0B + row-size=12B cardinality=0 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 02:TOP-N [LIMIT=10] | order by: avg(zip) ASC +| row-size=16B cardinality=0 | 04:AGGREGATE [FINALIZE] | output: avg:merge(id), count:merge(id), avg:merge(zip) | having: count(id) > 0 +| row-size=24B cardinality=0 | 03:EXCHANGE [UNPARTITIONED] | 01:AGGREGATE | output: avg(id), count(id), avg(zip) +| row-size=24B cardinality=0 | 00:SCAN HDFS [functional.testtbl] partitions=1/1 files=0 size=0B + row-size=12B cardinality=0 ==== # Test correct removal of redundant group-by expressions (IMPALA-817) select int_col + int_col, int_col * int_col @@ -103,9 +120,11 @@ PLAN-ROOT SINK | group by: int_col + int_col, int_col * int_col | having: int_col * int_col < 0 | limit: 10 +| row-size=16B cardinality=10 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=4B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -116,14 +135,17 @@ PLAN-ROOT SINK | group by: int_col + int_col, int_col * int_col | having: int_col * int_col < 0 | limit: 10 +| row-size=16B cardinality=10 | 02:EXCHANGE [HASH(int_col + int_col,int_col * int_col)] | 01:AGGREGATE [STREAMING] | group by: int_col + int_col, int_col * int_col +| row-size=16B cardinality=11.00K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=4B cardinality=11.00K ==== # Tests that a having predicate triggers slot materialization (IMPALA-846). select count(*) from @@ -138,17 +160,21 @@ PLAN-ROOT SINK | output: count(*), count(t2.int_col), count(t1.bigint_col) | group by: t1.tinyint_col, t2.smallint_col | having: count(t2.int_col) = count(t1.bigint_col) +| row-size=27B cardinality=2 | 02:HASH JOIN [INNER JOIN] | hash predicates: t1.smallint_col = t2.smallint_col | runtime filters: RF000 <- t2.smallint_col +| row-size=17B cardinality=5.84K | |--01:SCAN HDFS [functional.alltypestiny t2] | partitions=4/4 files=4 size=460B +| row-size=6B cardinality=8 | 00:SCAN HDFS [functional.alltypes t1] partitions=24/24 files=24 size=478.45KB runtime filters: RF000 -> t1.smallint_col + row-size=11B cardinality=7.30K ==== # Tests proper slot materialization of agg-tuple slots for avg (IMP-1271). # 't.x > 10' is picked up as an unassigned conjunct, and not as a binding @@ -165,9 +191,11 @@ PLAN-ROOT SINK | output: avg(bigint_col) | group by: int_col | having: avg(bigint_col) > 10 +| row-size=12B cardinality=1 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=12B cardinality=7.30K ==== # test distributed aggregation over unions (IMPALA-831) # non-distinct agg without grouping over a union @@ -182,35 +210,44 @@ PLAN-ROOT SINK 03:AGGREGATE [FINALIZE] | output: count(*) | limit: 10 +| row-size=8B cardinality=1 | 00:UNION | pass-through-operands: all +| row-size=0B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=0B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=0B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 05:AGGREGATE [FINALIZE] | output: count:merge(*) | limit: 10 +| row-size=8B cardinality=1 | 04:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE | output: count(*) +| row-size=8B cardinality=1 | 00:UNION | pass-through-operands: all +| row-size=0B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=0B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=0B cardinality=7.30K ==== # non-distinct agg with grouping over a union select count(*) from @@ -226,15 +263,19 @@ PLAN-ROOT SINK | output: count(*) | group by: bigint_col | limit: 10 +| row-size=16B cardinality=10 | 00:UNION | pass-through-operands: all +| row-size=8B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=8B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=8B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -245,21 +286,26 @@ PLAN-ROOT SINK | output: count:merge(*) | group by: t.bigint_col | limit: 10 +| row-size=16B cardinality=10 | 04:EXCHANGE [HASH(t.bigint_col)] | 03:AGGREGATE [STREAMING] | output: count(*) | group by: bigint_col +| row-size=16B cardinality=20 | 00:UNION | pass-through-operands: all +| row-size=8B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=8B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=8B cardinality=7.30K ==== # distinct agg without grouping over a union select count(distinct int_col) @@ -274,46 +320,58 @@ PLAN-ROOT SINK 04:AGGREGATE [FINALIZE] | output: count(int_col) | limit: 10 +| row-size=8B cardinality=1 | 03:AGGREGATE | group by: int_col +| row-size=4B cardinality=20 | 00:UNION | pass-through-operands: all +| row-size=4B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=4B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=4B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 08:AGGREGATE [FINALIZE] | output: count:merge(int_col) | limit: 10 +| row-size=8B cardinality=1 | 07:EXCHANGE [UNPARTITIONED] | 04:AGGREGATE | output: count(int_col) +| row-size=8B cardinality=1 | 06:AGGREGATE | group by: int_col +| row-size=4B cardinality=20 | 05:EXCHANGE [HASH(int_col)] | 03:AGGREGATE [STREAMING] | group by: int_col +| row-size=4B cardinality=20 | 00:UNION | pass-through-operands: all +| row-size=4B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=4B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=4B cardinality=7.30K ==== # distinct agg with grouping over a union select count(distinct int_col) @@ -330,18 +388,23 @@ PLAN-ROOT SINK | output: count(int_col) | group by: t.bigint_col | limit: 10 +| row-size=16B cardinality=10 | 03:AGGREGATE | group by: bigint_col, int_col +| row-size=12B cardinality=400 | 00:UNION | pass-through-operands: all +| row-size=12B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=12B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=12B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -352,29 +415,36 @@ PLAN-ROOT SINK | output: count:merge(int_col) | group by: t.bigint_col | limit: 10 +| row-size=16B cardinality=10 | 07:EXCHANGE [HASH(t.bigint_col)] | 04:AGGREGATE [STREAMING] | output: count(int_col) | group by: t.bigint_col +| row-size=16B cardinality=10 | 06:AGGREGATE | group by: t.bigint_col, int_col +| row-size=12B cardinality=400 | 05:EXCHANGE [HASH(t.bigint_col,int_col)] | 03:AGGREGATE [STREAMING] | group by: bigint_col, int_col +| row-size=12B cardinality=400 | 00:UNION | pass-through-operands: all +| row-size=12B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=12B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=12B cardinality=7.30K ==== # mixed distinct and non-distinct agg without grouping over a union select count(smallint_col), count(distinct int_col) @@ -389,49 +459,61 @@ PLAN-ROOT SINK 04:AGGREGATE [FINALIZE] | output: count(int_col), count:merge(smallint_col) | limit: 10 +| row-size=16B cardinality=1 | 03:AGGREGATE | output: count(smallint_col) | group by: int_col +| row-size=12B cardinality=20 | 00:UNION | pass-through-operands: all +| row-size=6B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=6B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=6B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 08:AGGREGATE [FINALIZE] | output: count:merge(int_col), count:merge(smallint_col) | limit: 10 +| row-size=16B cardinality=1 | 07:EXCHANGE [UNPARTITIONED] | 04:AGGREGATE | output: count(int_col), count:merge(smallint_col) +| row-size=16B cardinality=1 | 06:AGGREGATE | output: count:merge(smallint_col) | group by: int_col +| row-size=12B cardinality=20 | 05:EXCHANGE [HASH(int_col)] | 03:AGGREGATE [STREAMING] | output: count(smallint_col) | group by: int_col +| row-size=12B cardinality=20 | 00:UNION | pass-through-operands: all +| row-size=6B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=6B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=6B cardinality=7.30K ==== # mixed distinct and non-distinct agg with grouping over a union select count(smallint_col), count(distinct int_col) @@ -448,19 +530,24 @@ PLAN-ROOT SINK | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 +| row-size=24B cardinality=10 | 03:AGGREGATE | output: count(smallint_col) | group by: bigint_col, int_col +| row-size=20B cardinality=400 | 00:UNION | pass-through-operands: all +| row-size=14B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=14B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=14B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -471,31 +558,38 @@ PLAN-ROOT SINK | output: count:merge(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 +| row-size=24B cardinality=10 | 07:EXCHANGE [HASH(t.bigint_col)] | 04:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col +| row-size=24B cardinality=10 | 06:AGGREGATE | output: count:merge(smallint_col) | group by: t.bigint_col, int_col +| row-size=20B cardinality=400 | 05:EXCHANGE [HASH(t.bigint_col,int_col)] | 03:AGGREGATE [STREAMING] | output: count(smallint_col) | group by: bigint_col, int_col +| row-size=20B cardinality=400 | 00:UNION | pass-through-operands: all +| row-size=14B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=14B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=14B cardinality=7.30K ==== # mixed distinct and non-distinct agg with grouping over a union distinct select count(smallint_col), count(distinct int_col) @@ -512,22 +606,28 @@ PLAN-ROOT SINK | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 +| row-size=24B cardinality=10 | 04:AGGREGATE | output: count(smallint_col) | group by: bigint_col, int_col +| row-size=20B cardinality=400 | 03:AGGREGATE [FINALIZE] | group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month +| row-size=89B cardinality=7.40K | 00:UNION | pass-through-operands: all +| row-size=89B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=89B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=89B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -538,39 +638,48 @@ PLAN-ROOT SINK | output: count:merge(int_col), count:merge(smallint_col) | group by: t.bigint_col | limit: 10 +| row-size=24B cardinality=10 | 10:EXCHANGE [HASH(t.bigint_col)] | 05:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col +| row-size=24B cardinality=10 | 09:AGGREGATE | output: count:merge(smallint_col) | group by: t.bigint_col, int_col +| row-size=20B cardinality=400 | 08:EXCHANGE [HASH(t.bigint_col,int_col)] | 04:AGGREGATE [STREAMING] | output: count(smallint_col) | group by: bigint_col, int_col +| row-size=20B cardinality=400 | 07:AGGREGATE [FINALIZE] | group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month +| row-size=89B cardinality=7.40K | 06:EXCHANGE [HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)] | 03:AGGREGATE [STREAMING] | group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month +| row-size=89B cardinality=7.40K | 00:UNION | pass-through-operands: all +| row-size=89B cardinality=7.40K | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=89B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=89B cardinality=7.30K ==== # Mixed distinct and non-distinct agg with intermediate type different from input type # Regression test for IMPALA-5251 to exercise validateMergeAggFn() in FunctionCallExpr. @@ -581,36 +690,44 @@ PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] | output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount) +| row-size=24B cardinality=1 | 01:AGGREGATE | output: avg(l_quantity), ndv(l_discount) | group by: l_partkey +| row-size=24B cardinality=200.52K | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.92MB + partitions=1/1 files=3 size=193.60MB + row-size=24B cardinality=6.00M ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: count:merge(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount) +| row-size=24B cardinality=1 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE | output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount) +| row-size=24B cardinality=1 | 04:AGGREGATE | output: avg:merge(l_quantity), ndv:merge(l_discount) | group by: l_partkey +| row-size=24B cardinality=200.52K | 03:EXCHANGE [HASH(l_partkey)] | 01:AGGREGATE [STREAMING] | output: avg(l_quantity), ndv(l_discount) | group by: l_partkey +| row-size=24B cardinality=200.52K | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.92MB + partitions=1/1 files=3 size=193.60MB + row-size=24B cardinality=6.00M ==== # test that aggregations are not placed below an unpartitioned exchange with a limit select count(*) from (select * from functional.alltypes limit 10) t @@ -619,15 +736,18 @@ PLAN-ROOT SINK | 01:AGGREGATE [FINALIZE] | output: count(*) +| row-size=8B cardinality=1 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB limit: 10 + row-size=0B cardinality=10 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 01:AGGREGATE [FINALIZE] | output: count(*) +| row-size=8B cardinality=1 | 02:EXCHANGE [UNPARTITIONED] | limit: 10 @@ -635,6 +755,7 @@ PLAN-ROOT SINK 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB limit: 10 + row-size=0B cardinality=10 ==== # test that aggregations are not placed below an unpartitioned exchange with a limit select count(*) from @@ -646,21 +767,26 @@ PLAN-ROOT SINK | 03:AGGREGATE [FINALIZE] | output: count(*) +| row-size=8B cardinality=1 | 00:UNION | pass-through-operands: all | limit: 10 +| row-size=0B cardinality=10 | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=0B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=0B cardinality=7.30K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 03:AGGREGATE [FINALIZE] | output: count(*) +| row-size=8B cardinality=1 | 04:EXCHANGE [UNPARTITIONED] | limit: 10 @@ -668,12 +794,15 @@ PLAN-ROOT SINK 00:UNION | pass-through-operands: all | limit: 10 +| row-size=0B cardinality=10 | |--02:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB +| row-size=0B cardinality=100 | 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB + row-size=0B cardinality=7.30K ==== # test that limits are applied at the top-level merge aggregation node for non-grouping # distinct aggregation (IMPALA-1802) @@ -690,66 +819,82 @@ PLAN-ROOT SINK 06:AGGREGATE [FINALIZE] | output: count(cnt) | limit: 1 +| row-size=8B cardinality=1 | 05:AGGREGATE | group by: count(t1.id) +| row-size=8B cardinality=1 | 04:AGGREGATE [FINALIZE] | output: count(t1.id) | limit: 10 +| row-size=8B cardinality=1 | 03:AGGREGATE | group by: t1.id +| row-size=4B cardinality=9 | 02:HASH JOIN [INNER JOIN] | hash predicates: t1.id = t2.id | runtime filters: RF000 <- t2.id +| row-size=8B cardinality=9 | |--01:SCAN HDFS [functional.alltypestiny t2] | partitions=4/4 files=4 size=460B +| row-size=4B cardinality=8 | 00:SCAN HDFS [functional.alltypesagg t1] partitions=11/11 files=11 size=814.73KB runtime filters: RF000 -> t1.id + row-size=4B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: count(cnt) | limit: 1 +| row-size=8B cardinality=1 | 05:AGGREGATE | group by: count(t1.id) +| row-size=8B cardinality=1 | 11:AGGREGATE [FINALIZE] | output: count:merge(t1.id) | limit: 10 +| row-size=8B cardinality=1 | 10:EXCHANGE [UNPARTITIONED] | 04:AGGREGATE | output: count(t1.id) +| row-size=8B cardinality=1 | 09:AGGREGATE | group by: t1.id +| row-size=4B cardinality=9 | 08:EXCHANGE [HASH(t1.id)] | 03:AGGREGATE [STREAMING] | group by: t1.id +| row-size=4B cardinality=9 | 02:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: t1.id = t2.id | runtime filters: RF000 <- t2.id +| row-size=8B cardinality=9 | |--07:EXCHANGE [BROADCAST] | | | 01:SCAN HDFS [functional.alltypestiny t2] | partitions=4/4 files=4 size=460B +| row-size=4B cardinality=8 | 00:SCAN HDFS [functional.alltypesagg t1] partitions=11/11 files=11 size=814.73KB runtime filters: RF000 -> t1.id + row-size=4B cardinality=11.00K ==== # IMPALA-2089: Tests correct elimination of redundant predicates. # The equivalences between inline-view slots are enforced inside the inline-view plan. @@ -768,10 +913,12 @@ PLAN-ROOT SINK 01:AGGREGATE [FINALIZE] | group by: tinyint_col, smallint_col, int_col + int_col, coalesce(bigint_col, year) | having: int_col + int_col = coalesce(bigint_col, year), smallint_col = int_col + int_col +| row-size=19B cardinality=730 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB predicates: functional.alltypes.tinyint_col = functional.alltypes.smallint_col + row-size=19B cardinality=730 ==== # IMPALA-1917: Test NULL literals inside inline view with grouping aggregation. select cnt from @@ -784,9 +931,11 @@ PLAN-ROOT SINK 01:AGGREGATE [FINALIZE] | output: count(*) | group by: bool_col, CAST(NULL AS INT) +| row-size=13B cardinality=2 | 00:SCAN HDFS [functional.alltypestiny] partitions=4/4 files=4 size=460B + row-size=1B cardinality=8 ==== # IMPALA-1917: Test NULL literals inside inline view with grouping aggregation. select cnt from @@ -799,12 +948,15 @@ PLAN-ROOT SINK 02:AGGREGATE [FINALIZE] | output: count(int_col) | group by: bool_col, NULL +| row-size=10B cardinality=2 | 01:AGGREGATE | group by: bool_col, NULL, int_col +| row-size=6B cardinality=4 | 00:SCAN HDFS [functional.alltypestiny] partitions=4/4 files=4 size=460B + row-size=5B cardinality=8 ==== # test simple group_concat with distinct select group_concat(distinct string_col) from functional.alltypesagg @@ -813,33 +965,41 @@ PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] | output: group_concat(string_col) +| row-size=12B cardinality=1 | 01:AGGREGATE | group by: string_col +| row-size=15B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col) +| row-size=12B cardinality=1 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE | output: group_concat(string_col) +| row-size=12B cardinality=1 | 04:AGGREGATE | group by: string_col +| row-size=15B cardinality=963 | 03:EXCHANGE [HASH(string_col)] | 01:AGGREGATE [STREAMING] | group by: string_col +| row-size=15B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ==== # test group_concat and a group by select day, group_concat(distinct string_col) @@ -851,25 +1011,31 @@ PLAN-ROOT SINK 03:AGGREGATE [FINALIZE] | output: group_concat(string_col) | group by: day +| row-size=16B cardinality=11 | 02:AGGREGATE | group by: day, string_col +| row-size=19B cardinality=1.10K | 01:TOP-N [LIMIT=99999] | order by: id ASC +| row-size=23B cardinality=1.10K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB predicates: day = id % 100 + row-size=23B cardinality=1.10K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 03:AGGREGATE [FINALIZE] | output: group_concat(string_col) | group by: day +| row-size=16B cardinality=11 | 02:AGGREGATE | group by: day, string_col +| row-size=19B cardinality=1.10K | 04:MERGING-EXCHANGE [UNPARTITIONED] | order by: id ASC @@ -877,10 +1043,12 @@ PLAN-ROOT SINK | 01:TOP-N [LIMIT=99999] | order by: id ASC +| row-size=23B cardinality=1.10K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB predicates: day = id % 100 + row-size=23B cardinality=1.10K ==== # test group_concat with distinct together with another distinct aggregate function select count(distinct cast(timestamp_col as string)), @@ -892,12 +1060,15 @@ PLAN-ROOT SINK 02:AGGREGATE [FINALIZE] | output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING)) | group by: year +| row-size=24B cardinality=1 | 01:AGGREGATE | group by: year, CAST(timestamp_col AS STRING) +| row-size=20B cardinality=10.21K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=20B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -906,23 +1077,28 @@ PLAN-ROOT SINK 06:AGGREGATE [FINALIZE] | output: count:merge(CAST(timestamp_col AS STRING)), group_concat:merge(CAST(timestamp_col AS STRING)) | group by: year +| row-size=24B cardinality=1 | 05:EXCHANGE [HASH(year)] | 02:AGGREGATE [STREAMING] | output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING)) | group by: year +| row-size=24B cardinality=1 | 04:AGGREGATE | group by: year, CAST(timestamp_col AS STRING) +| row-size=20B cardinality=10.21K | 03:EXCHANGE [HASH(year,CAST(timestamp_col AS STRING))] | 01:AGGREGATE [STREAMING] | group by: year, CAST(timestamp_col AS STRING) +| row-size=20B cardinality=10.21K | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=20B cardinality=11.00K ==== # test group_concat distinct with other non-distinct aggregate functions select group_concat(distinct string_col), count(*) from functional.alltypesagg @@ -931,36 +1107,44 @@ PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] | output: group_concat(string_col), count:merge(*) +| row-size=20B cardinality=1 | 01:AGGREGATE | output: count(*) | group by: string_col +| row-size=23B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col), count:merge(*) +| row-size=20B cardinality=1 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE | output: group_concat(string_col), count:merge(*) +| row-size=20B cardinality=1 | 04:AGGREGATE | output: count:merge(*) | group by: string_col +| row-size=23B cardinality=963 | 03:EXCHANGE [HASH(string_col)] | 01:AGGREGATE [STREAMING] | output: count(*) | group by: string_col +| row-size=23B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ==== # test group_concat distinct with other aggregate functions, with custom separator select group_concat(distinct string_col, '-'), sum(int_col), count(distinct string_col) @@ -970,36 +1154,44 @@ PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] | output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col) +| row-size=28B cardinality=1 | 01:AGGREGATE | output: sum(int_col) | group by: string_col +| row-size=23B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=19B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col, '-'), count:merge(string_col), sum:merge(int_col) +| row-size=28B cardinality=1 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE | output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col) +| row-size=28B cardinality=1 | 04:AGGREGATE | output: sum:merge(int_col) | group by: string_col +| row-size=23B cardinality=963 | 03:EXCHANGE [HASH(string_col)] | 01:AGGREGATE [STREAMING] | output: sum(int_col) | group by: string_col +| row-size=23B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=19B cardinality=11.00K ==== # test group_concat distinct with other aggregate functions, with custom separator # and a group by @@ -1012,13 +1204,16 @@ PLAN-ROOT SINK 02:AGGREGATE [FINALIZE] | output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*) | group by: month, year +| row-size=36B cardinality=1 | 01:AGGREGATE | output: count(*) | group by: month, year, date_string_col +| row-size=36B cardinality=10 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=28B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -1027,25 +1222,30 @@ PLAN-ROOT SINK 06:AGGREGATE [FINALIZE] | output: count:merge(date_string_col), group_concat:merge(date_string_col, '-'), count:merge(*) | group by: month, year +| row-size=36B cardinality=1 | 05:EXCHANGE [HASH(month,year)] | 02:AGGREGATE [STREAMING] | output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*) | group by: month, year +| row-size=36B cardinality=1 | 04:AGGREGATE | output: count:merge(*) | group by: month, year, date_string_col +| row-size=36B cardinality=10 | 03:EXCHANGE [HASH(month,year,date_string_col)] | 01:AGGREGATE [STREAMING] | output: count(*) | group by: month, year, date_string_col +| row-size=36B cardinality=10 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=28B cardinality=11.00K ==== # test multiple group_concat distinct, each with a different separator select group_concat(distinct string_col), group_concat(distinct string_col, '-'), @@ -1055,33 +1255,41 @@ PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] | output: group_concat(string_col), group_concat(string_col, '-'), group_concat(string_col, '---') +| row-size=36B cardinality=1 | 01:AGGREGATE | group by: string_col +| row-size=15B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col), group_concat:merge(string_col, '-'), group_concat:merge(string_col, '---') +| row-size=36B cardinality=1 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE | output: group_concat(string_col), group_concat(string_col, '-'), group_concat(string_col, '---') +| row-size=36B cardinality=1 | 04:AGGREGATE | group by: string_col +| row-size=15B cardinality=963 | 03:EXCHANGE [HASH(string_col)] | 01:AGGREGATE [STREAMING] | group by: string_col +| row-size=15B cardinality=963 | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB + row-size=15B cardinality=11.00K ==== # IMPALA-852: Aggregation only in the HAVING clause. select 1 from functional.alltypestiny having count(*) > 0 @@ -1091,9 +1299,11 @@ PLAN-ROOT SINK 01:AGGREGATE [FINALIZE] | output: count(*) | having: count(*) > 0 +| row-size=8B cardinality=0 | 00:SCAN HDFS [functional.alltypestiny] partitions=4/4 files=4 size=460B + row-size=0B cardinality=8 ==== # Grouping aggregation where input is partitioned on grouping expr. # Planner should not redundantly repartition the data that was already partitioned on @@ -1115,22 +1325,26 @@ PLAN-ROOT SINK | group by: c_custkey | having: count(*) < 150000 | limit: 1000000 +| row-size=16B cardinality=15.00K | 02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: c_custkey = o_custkey | runtime filters: RF000 <- o_custkey +| row-size=18B cardinality=91.47K | |--05:EXCHANGE [HASH(o_custkey)] | | | 01:SCAN HDFS [tpch_parquet.orders] -| partitions=1/1 files=2 size=54.20MB +| partitions=1/1 files=2 size=54.07MB +| row-size=8B cardinality=1.50M | 04:EXCHANGE [HASH(c_custkey)] | 00:SCAN HDFS [tpch_parquet.customer] - partitions=1/1 files=1 size=12.34MB + partitions=1/1 files=1 size=12.31MB predicates: c_nationkey = 16 runtime filters: RF000 -> c_custkey + row-size=10B cardinality=6.00K ==== # Distinct aggregation where input is partitioned on distinct expr. # Planner should not redundantly repartition the data that was already partitioned on @@ -1147,29 +1361,35 @@ PLAN-ROOT SINK | output: count:merge(c_custkey) | having: count(c_custkey) > 50 | limit: 50 +| row-size=8B cardinality=0 | 07:EXCHANGE [UNPARTITIONED] | 04:AGGREGATE | output: count(c_custkey) +| row-size=8B cardinality=0 | 03:AGGREGATE | group by: c_custkey +| row-size=8B cardinality=150.00K | 02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: o_custkey = c_custkey | runtime filters: RF000 <- c_custkey +| row-size=16B cardinality=1.50M | |--06:EXCHANGE [HASH(c_custkey)] | | | 01:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.34MB +| partitions=1/1 files=1 size=12.31MB +| row-size=8B cardinality=150.00K | 05:EXCHANGE [HASH(o_custkey)] | 00:SCAN HDFS [tpch_parquet.orders] - partitions=1/1 files=2 size=54.20MB + partitions=1/1 files=2 size=54.07MB runtime filters: RF000 -> o_custkey + row-size=8B cardinality=1.50M ==== # Distinct grouping aggregation where input is partitioned on distinct and grouping exprs. # Planner should not redundantly repartition the data that was already partitioned on @@ -1185,30 +1405,36 @@ PLAN-ROOT SINK 08:AGGREGATE [FINALIZE] | output: count:merge(c_custkey) | group by: c_custkey +| row-size=16B cardinality=150.00K | 07:EXCHANGE [HASH(c_custkey)] | 04:AGGREGATE [STREAMING] | output: count(c_custkey) | group by: c_custkey +| row-size=16B cardinality=150.00K | 03:AGGREGATE | group by: c_custkey, c_custkey +| row-size=16B cardinality=1.50M | 02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: o_custkey = c_custkey | runtime filters: RF000 <- c_custkey +| row-size=16B cardinality=1.50M | |--06:EXCHANGE [HASH(c_custkey)] | | | 01:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.34MB +| partitions=1/1 files=1 size=12.31MB +| row-size=8B cardinality=150.00K | 05:EXCHANGE [HASH(o_custkey)] | 00:SCAN HDFS [tpch_parquet.orders] - partitions=1/1 files=2 size=54.20MB + partitions=1/1 files=2 size=54.07MB runtime filters: RF000 -> o_custkey + row-size=8B cardinality=1.50M ==== # Complex aggregation when two joins and an agg end up in same fragment. select l_orderkey, l_returnflag, count(*) from ( @@ -1233,31 +1459,37 @@ PLAN-ROOT SINK | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_returnflag | having: count(*) > 10 | limit: 10 +| row-size=29B cardinality=10 | 04:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: o_custkey = c_custkey, o_comment = c_phone | runtime filters: RF000 <- c_custkey, RF001 <- c_phone +| row-size=160B cardinality=607.19K | |--08:EXCHANGE [BROADCAST] | | | 02:SCAN HDFS [tpch_parquet.customer] -| partitions=1/1 files=1 size=12.34MB +| partitions=1/1 files=1 size=12.31MB +| row-size=35B cardinality=150.00K | 03:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk | runtime filters: RF004 <- o_orderkey, RF005 <- o_clerk +| row-size=125B cardinality=5.76M | |--07:EXCHANGE [HASH(o_orderkey,o_clerk)] | | | 01:SCAN HDFS [tpch_parquet.orders] -| partitions=1/1 files=2 size=54.20MB +| partitions=1/1 files=2 size=54.07MB | runtime filters: RF000 -> o_custkey, RF001 -> o_comment +| row-size=104B cardinality=1.50M | 06:EXCHANGE [HASH(l_orderkey,l_returnflag)] | 00:SCAN HDFS [tpch_parquet.lineitem] - partitions=1/1 files=3 size=193.92MB + partitions=1/1 files=3 size=193.60MB runtime filters: RF004 -> l_orderkey, RF005 -> l_returnflag + row-size=21B cardinality=6.00M ==== # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs reference a # tuple that is made nullable in the join fragment. @@ -1274,25 +1506,30 @@ PLAN-ROOT SINK 07:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: t2.id +| row-size=12B cardinality=99 | 06:EXCHANGE [HASH(t2.id)] | 03:AGGREGATE [STREAMING] | output: count(*) | group by: t2.id +| row-size=12B cardinality=99 | 02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] | hash predicates: t1.id = t2.id +| row-size=8B cardinality=7.30K | |--05:EXCHANGE [HASH(t2.id)] | | | 01:SCAN HDFS [functional.alltypessmall t2] | partitions=4/4 files=4 size=6.32KB +| row-size=4B cardinality=100 | 04:EXCHANGE [HASH(t1.id)] | 00:SCAN HDFS [functional.alltypes t1] partitions=24/24 files=24 size=478.45KB + row-size=4B cardinality=7.30K ==== # IMPALA-4263: Grouping agg is placed in the join fragment and has no merge step. select /* +straight_join */ t1.id, count(*) @@ -1308,19 +1545,23 @@ PLAN-ROOT SINK 03:AGGREGATE [FINALIZE] | output: count(*) | group by: t1.id +| row-size=12B cardinality=7.30K | 02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] | hash predicates: t1.id = t2.id +| row-size=8B cardinality=7.30K | |--05:EXCHANGE [HASH(t2.id)] | | | 01:SCAN HDFS [functional.alltypessmall t2] | partitions=4/4 files=4 size=6.32KB +| row-size=4B cardinality=100 | 04:EXCHANGE [HASH(t1.id)] | 00:SCAN HDFS [functional.alltypes t1] partitions=24/24 files=24 size=478.45KB + row-size=4B cardinality=7.30K ==== # IMPALA-4263: Grouping agg is placed in the second join fragment and has no merge step. # The grouping exprs reference a nullable tuple (t2), but that tuple is made nullable in @@ -1341,27 +1582,33 @@ PLAN-ROOT SINK 05:AGGREGATE [FINALIZE] | output: count(*) | group by: t2.id +| row-size=12B cardinality=99 | 04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] | hash predicates: t2.id = t3.id +| row-size=16B cardinality=73.00K | |--09:EXCHANGE [HASH(t3.id)] | | | 02:SCAN HDFS [functional.alltypestiny t3] | partitions=4/4 files=4 size=460B +| row-size=4B cardinality=8 | 08:EXCHANGE [HASH(t2.id)] | 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] | hash predicates: t1.int_col = t2.int_col +| row-size=12B cardinality=73.00K | |--07:EXCHANGE [HASH(t2.int_col)] | | | 01:SCAN HDFS [functional.alltypessmall t2] | partitions=4/4 files=4 size=6.32KB +| row-size=8B cardinality=100 | 06:EXCHANGE [HASH(t1.int_col)] | 00:SCAN HDFS [functional.alltypes t1] partitions=24/24 files=24 size=478.45KB + row-size=4B cardinality=7.30K ====