IMPALA-8021: Add estimated cardinality to EXPLAIN output

Cardinality is vital to understanding why a plan has the form it does,
yet the planner normally emits cardinality information only for the
detailed levels. Unfortunately, most query profiles we see are at the
standard level without this information (except in the summary table),
making it hard to understand what happened.

This patch adds cardinality to the standard EXPLAIN output. It also
changes the displayed cardinality value to be in abbreviated "metric"
form: 1.23K instead of 1234, etc.

Changing the DESCRIBE output has a huge impact on PlannerTest: all the
"golden" test files must change. To avoid doing this twice, this patch
also includes:

IMPALA-7919: Add predicates line in plan output for partition key
predicates

This is also the time to also include:

IMPALA-8022: Add cardinality checks to PlannerTest

The comparison code was changed to allow a set of validators, one of
which compares cardinality to ensure it is within 5% of the expected
value. This should ensure we don't change estimates unintentionally.

While many planner tests are concerned with cardinality, many others are
not. Testing showed that the cardinality is actually unstable within
tests. For such tests, added filters to ignore cardinality. The filter
is enabled by default (for backward compatibility) but disabled (to
allow cardinality verification) for the critical tests.

Rebasing the tests was complicated by a bug in the error-matching code,
so this patch also fixes:

IMPALA-8023: Fix PlannerTest to handle error lines consistently

Now, the error output written to the output "save results" file matches
that expected in the "golden" file -- no more handling these specially.

Testing:

* Added cardinality verification.
* Reran all FE tests.
* Rebased all PlannerTest .test files.
* Adjusted the metadata/test_explain.py test to handle the changed
  EXPLAIN output.

Change-Id: Ie9aa2d715b04cbb279aaffec8c5692686562d986
Reviewed-on: http://gerrit.cloudera.org:8080/12136
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a7ea86b7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a7ea86b7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a7ea86b7

Branch: refs/heads/master
Commit: a7ea86b768247ff5388174445e7c91736b99c2de
Parents: 3a3ab7f
Author: paul-rogers <prog...@cloudera.com>
Authored: Thu Dec 27 17:55:16 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Sat Jan 12 04:03:26 2019 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/PartitionSet.java    |    2 +-
 .../org/apache/impala/common/PrintUtils.java    |   74 +-
 .../org/apache/impala/planner/EmptySetNode.java |    2 +
 .../org/apache/impala/planner/ExchangeNode.java |   11 +
 .../impala/planner/HdfsPartitionPruner.java     |   12 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   48 +-
 .../impala/planner/NestedLoopJoinNode.java      |    1 +
 .../org/apache/impala/planner/PlanNode.java     |   28 +-
 .../impala/planner/SingleNodePlanner.java       |   11 +-
 .../apache/impala/planner/CardinalityTest.java  |    9 +
 .../org/apache/impala/planner/PlannerTest.java  |   53 +-
 .../apache/impala/planner/PlannerTestBase.java  |   15 +-
 .../org/apache/impala/testutil/TestUtils.java   |   55 +-
 .../queries/PlannerTest/aggregation.test        |  269 +++-
 .../queries/PlannerTest/analytic-fns.test       |  423 +++++-
 .../PlannerTest/complex-types-file-formats.test |   37 +-
 .../queries/PlannerTest/conjunct-ordering.test  |   17 +
 .../queries/PlannerTest/constant-folding.test   |   38 +-
 .../PlannerTest/constant-propagation.test       |   49 +-
 .../queries/PlannerTest/constant.test           |    2 +
 .../queries/PlannerTest/data-source-tables.test |    9 +-
 .../queries/PlannerTest/ddl.test                |   48 +
 .../default-join-distr-mode-broadcast.test      |   13 +-
 .../default-join-distr-mode-shuffle.test        |   13 +-
 .../queries/PlannerTest/disable-codegen.test    |   38 +-
 .../PlannerTest/disable-preaggregations.test    |    8 +
 .../queries/PlannerTest/distinct-estimate.test  |   13 +
 .../queries/PlannerTest/distinct.test           |  123 ++
 .../queries/PlannerTest/empty.test              |   67 +-
 .../PlannerTest/fk-pk-join-detection.test       |   72 +-
 .../queries/PlannerTest/hbase.test              |   76 +-
 .../queries/PlannerTest/hdfs.test               |  226 +++-
 .../queries/PlannerTest/implicit-joins.test     |   84 ++
 .../queries/PlannerTest/inline-view-limit.test  |  109 ++
 .../queries/PlannerTest/inline-view.test        |  209 +++
 .../queries/PlannerTest/insert-sort-by.test     |   72 +
 .../queries/PlannerTest/insert.test             |  129 ++
 .../queries/PlannerTest/join-order.test         |  285 ++++
 .../queries/PlannerTest/joins.test              |  441 +++++++
 .../queries/PlannerTest/kudu-delete.test        |   14 +
 .../queries/PlannerTest/kudu-selectivity.test   |    6 +-
 .../queries/PlannerTest/kudu-update.test        |   24 +
 .../queries/PlannerTest/kudu-upsert.test        |   50 +
 .../queries/PlannerTest/kudu.test               |   82 +-
 .../queries/PlannerTest/lineage.test            |  192 ++-
 .../queries/PlannerTest/max-row-size.test       |  110 +-
 .../PlannerTest/mem-limit-broadcast-join.test   |    3 +
 .../PlannerTest/min-max-runtime-filters.test    |   22 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   40 +-
 .../PlannerTest/multiple-distinct-limit.test    |   28 +
 .../multiple-distinct-materialization.test      |  141 ++
 .../multiple-distinct-predicates.test           |   60 +
 .../queries/PlannerTest/multiple-distinct.test  |  160 +++
 .../queries/PlannerTest/nested-collections.test |  575 +++++++-
 .../queries/PlannerTest/nested-loop-join.test   |   52 +
 .../queries/PlannerTest/order.test              |  298 +++++
 .../queries/PlannerTest/outer-joins.test        |  173 +++
 .../PlannerTest/parquet-filtering-disabled.test |   32 +-
 .../queries/PlannerTest/parquet-filtering.test  |   20 +-
 .../queries/PlannerTest/parquet-stats-agg.test  |  124 +-
 .../PlannerTest/partition-key-scans.test        |   57 +
 .../queries/PlannerTest/partition-pruning.test  |    1 +
 .../PlannerTest/predicate-propagation.test      |  235 +++-
 .../PlannerTest/resource-requirements.test      |  965 +++++++-------
 .../PlannerTest/runtime-filter-propagation.test |  284 +++-
 .../runtime-filter-query-options.test           |  113 ++
 .../PlannerTest/shuffle-by-distinct-exprs.test  |   55 +
 .../queries/PlannerTest/small-query-opt.test    |   53 +-
 .../PlannerTest/sort-expr-materialization.test  |   32 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  192 +--
 .../queries/PlannerTest/subquery-rewrite.test   |  508 +++++++
 .../queries/PlannerTest/tablesample.test        |   12 +-
 .../PlannerTest/topn-bytes-limit-small.test     |   12 +
 .../queries/PlannerTest/topn-bytes-limit.test   |    6 +-
 .../queries/PlannerTest/topn.test               |   73 ++
 .../queries/PlannerTest/tpcds-all.test          | 1234 ++++++++++++++++++
 .../queries/PlannerTest/tpch-all.test           |  658 ++++++++++
 .../queries/PlannerTest/tpch-kudu.test          |  200 +++
 .../queries/PlannerTest/tpch-nested.test        |  701 +++++++++-
 .../queries/PlannerTest/tpch-views.test         |  200 +++
 .../queries/PlannerTest/union.test              |  913 ++++++++++++-
 .../queries/PlannerTest/values.test             |   12 +
 .../queries/PlannerTest/views.test              |   88 ++
 .../queries/PlannerTest/with-clause.test        |  138 ++
 .../queries/QueryTest/corrupt-stats.test        |   14 +-
 .../queries/QueryTest/explain-level1.test       |    3 +
 .../queries/QueryTest/explain-level2.test       |   30 +-
 .../queries/QueryTest/explain-level3.test       |   24 +-
 .../queries/QueryTest/stats-extrapolation.test  |   11 +-
 tests/metadata/test_explain.py                  |    6 +-
 90 files changed, 11078 insertions(+), 1149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java 
b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index 53a321f..e9972bb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -86,7 +86,7 @@ public class PartitionSet extends PartitionSpecBase {
 
     try {
       HdfsPartitionPruner pruner = new HdfsPartitionPruner(desc);
-      partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, 
true);
+      partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, 
true).first;
     } catch (ImpalaException e) {
       if (e instanceof AnalysisException) throw (AnalysisException) e;
       throw new AnalysisException("Partition expr evaluation failed in the 
backend.", e);

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java 
b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index f4814b5..2636914 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -24,6 +24,8 @@ import static org.apache.impala.common.ByteUnits.PETABYTE;
 import static org.apache.impala.common.ByteUnits.TERABYTE;
 
 import java.text.DecimalFormat;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
@@ -49,6 +51,73 @@ public class PrintUtils {
     return bytes + "B";
   }
 
+  public static final long KILO = 1000;
+  public static final long MEGA = KILO * 1000;
+  public static final long GIGA = MEGA * 1000;
+  public static final long TERA = GIGA * 1000;
+
+  /**
+   * Print a value using simple metric (power of 1000) units. Units are
+   * (none), K, M, G or T. Value has two digits past the decimal point.
+   */
+  public static String printMetric(long value) {
+    double result = value;
+    if (value >= TERA) return new DecimalFormat(".00T").format(result / TERA);
+    if (value >= GIGA) return new DecimalFormat(".00G").format(result / GIGA);
+    if (value >= MEGA) return new DecimalFormat(".00M").format(result / MEGA);
+    if (value >= KILO) return new DecimalFormat(".00K").format(result / KILO);
+    return Long.toString(value);
+  }
+
+  /**
+   * Pattern to use when searching for a metric-encoded value.
+   */
+  public static final String METRIC_REGEX = "(\\d+(?:.\\d+)?)([TGMK]?)";
+
+  /**
+   * Pattern to use when searching for or parsing a metric-encoded value.
+   */
+  public static final Pattern METRIC_PATTERN =
+      Pattern.compile(METRIC_REGEX, Pattern.CASE_INSENSITIVE);
+
+  /**
+   * Decode a value metric-encoded using {@link #printMetric(long)}.
+   * @param value metric-encoded string
+   * @return approximate numeric value, or -1 if the value is invalid
+   * (metric encoded strings can never be negative normally)
+   */
+  public static double decodeMetric(String value) {
+    Matcher m = METRIC_PATTERN.matcher(value);
+    if (! m.matches()) return -1;
+    return decodeMetric(m.group(1), m.group(2));
+  }
+
+  /**
+   * Decode a metric-encoded string already parsed into parts.
+   * @param valueStr numeric part of the value
+   * @param units units part of the value
+   * @return approximate numeric value
+   */
+  // Yes, "PrintUtils" is an odd place for a parse function, but
+  // best to keep the formatter and parser together.
+  public static double decodeMetric(String valueStr, String units) {
+    double value = Double.parseDouble(valueStr);
+    switch (units.toUpperCase()) {
+    case "":
+      return value;
+    case "K":
+      return value * KILO;
+    case "M":
+      return value * MEGA;
+    case "G":
+      return value * GIGA;
+    case "T":
+      return value * TERA;
+    default:
+      return -1;
+    }
+  }
+
   /**
    * Same as printBytes() except 0 decimal points are shown for MB and KB.
    */
@@ -65,9 +134,8 @@ public class PrintUtils {
     return bytes + "B";
   }
 
-  public static String printCardinality(String prefix, long cardinality) {
-    return prefix + "cardinality=" +
-        ((cardinality != -1) ? String.valueOf(cardinality) : "unavailable");
+  public static String printCardinality(long cardinality) {
+    return (cardinality != -1) ? printMetric(cardinality) : "unavailable";
   }
 
   public static String printNumHosts(String prefix, long numHosts) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java 
b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index 7b3ea33..55830c2 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -76,4 +76,6 @@ public class EmptySetNode extends PlanNode {
     msg.node_type = TPlanNodeType.EMPTY_SET_NODE;
   }
 
+  @Override
+  protected boolean displayCardinality(TExplainLevel detailLevel) { return 
false; }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java 
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 356ae6b..a140dc2 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -173,6 +173,17 @@ public class ExchangeNode extends PlanNode {
     return output.toString();
   }
 
+  /**
+   * An Exchange simply moves rows over the network: its row width
+   * and cardinality are identical to its input. So, for standard
+   * level, there is no need to repeat these values. Retained in
+   * higher levels for backward compatibility.
+   */
+  @Override
+  protected boolean displayCardinality(TExplainLevel detailLevel) {
+    return detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal();
+  }
+
   @Override
   protected String getDisplayLabelDetail() {
     // For the non-fragmented explain levels, print the data partition

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
index 9fb204a..e9deb44 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
@@ -44,6 +44,7 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.slf4j.Logger;
@@ -97,16 +98,20 @@ public class HdfsPartitionPruner {
 
   /**
    * Return a list of partitions left after applying the conjuncts. Please note
-   * that conjuncts used for filtering will be removed from the list 
'conjuncts'.
+   * that conjuncts used for filtering will be removed from the list 
'conjuncts' and
+   * returned as the second item in the returned Pair. These expressions can be
+   * shown in the EXPLAIN output.
+   *
    * If 'allowEmpty' is False, empty partitions are not returned.
    */
-  public List<? extends FeFsPartition> prunePartitions(
+  public Pair<List<? extends FeFsPartition>, List<Expr>> prunePartitions(
       Analyzer analyzer, List<Expr> conjuncts, boolean allowEmpty)
       throws ImpalaException {
     // Start with creating a collection of partition filters for the 
applicable conjuncts.
     List<HdfsPartitionFilter> partitionFilters = new ArrayList<>();
     // Conjuncts that can be evaluated from the partition key values.
     List<Expr> simpleFilterConjuncts = new ArrayList<>();
+    List<Expr> partitionConjuncts = new ArrayList<>();
 
     // Simple predicates (e.g. binary predicates of the form
     // <SlotRef> <op> <LiteralExpr>) can be used to derive lists
@@ -128,6 +133,7 @@ public class HdfsPartitionPruner {
         } else {
           partitionFilters.add(new HdfsPartitionFilter(clonedConjunct, tbl_, 
analyzer));
         }
+        partitionConjuncts.add(conjunct);
         it.remove();
       }
     }
@@ -168,7 +174,7 @@ public class HdfsPartitionPruner {
             }
           }));
     }
-    return results;
+    return new Pair<>(results, partitionConjuncts);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 4f9a961..cc31ce4 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,9 +49,9 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
-import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -68,8 +68,8 @@ import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
-import org.apache.impala.thrift.THdfsFileSplit;
 import org.apache.impala.thrift.TFileSplitGeneratorSpec;
+import org.apache.impala.thrift.THdfsFileSplit;
 import org.apache.impala.thrift.THdfsScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPlanNode;
@@ -91,8 +91,6 @@ import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Scan of a single table.
@@ -261,10 +259,13 @@ public class HdfsScanNode extends ScanNode {
   // parquet::Statistics.
   private TupleDescriptor minMaxTuple_;
 
-  // Slot that is used to record the Parquet metatdata for the count(*) 
aggregation if
+  // Slot that is used to record the Parquet metadata for the count(*) 
aggregation if
   // this scan node has the count(*) optimization enabled.
   private SlotDescriptor countStarSlot_ = null;
 
+  // Conjuncts used to trim the set of partitions passed to this node.
+  // Used only to display EXPLAIN information.
+  private final List<Expr> partitionConjuncts_;
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -272,12 +273,14 @@ public class HdfsScanNode extends ScanNode {
    * class comments above for details.
    */
   public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> 
conjuncts,
-      List<? extends FeFsPartition> partitions, TableRef hdfsTblRef, 
AggregateInfo aggInfo) {
+      List<? extends FeFsPartition> partitions, TableRef hdfsTblRef,
+      AggregateInfo aggInfo, List<Expr> partConjuncts) {
     super(id, desc, "SCAN HDFS");
     Preconditions.checkState(desc.getTable() instanceof FeFsTable);
     tbl_ = (FeFsTable)desc.getTable();
     conjuncts_ = conjuncts;
     partitions_ = partitions;
+    partitionConjuncts_ = partConjuncts;
     sampleParams_ = hdfsTblRef.getSampleParams();
     replicaPreference_ = hdfsTblRef.getReplicaPreference();
     randomReplica_ = hdfsTblRef.getRandomReplica();
@@ -1224,20 +1227,27 @@ public class HdfsScanNode extends ScanNode {
     }
     output.append("]\n");
     if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
+      if (partitionConjuncts_ != null && !partitionConjuncts_.isEmpty()) {
+        output.append(detailPrefix)
+          .append(String.format("partition predicates: %s\n",
+              getExplainString(partitionConjuncts_, detailLevel)));
+      }
       if (tbl_.getNumClusteringCols() == 0) numPartitions_ = 1;
-      output.append(String.format("%spartitions=%s/%s files=%s size=%s", 
detailPrefix,
-          numPartitions_, table.getPartitions().size(), totalFiles_,
-          PrintUtils.printBytes(totalBytes_)));
-      output.append("\n");
+      output.append(detailPrefix)
+        .append(String.format("partitions=%d/%d files=%d size=%s\n",
+            numPartitions_, table.getPartitions().size(), totalFiles_,
+            PrintUtils.printBytes(totalBytes_)));
       if (!conjuncts_.isEmpty()) {
-        output.append(String.format("%spredicates: %s\n", detailPrefix,
+        output.append(detailPrefix)
+          .append(String.format("predicates: %s\n",
             getExplainString(conjuncts_, detailLevel)));
       }
       if (!collectionConjuncts_.isEmpty()) {
         for (Map.Entry<TupleDescriptor, List<Expr>> entry:
           collectionConjuncts_.entrySet()) {
           String alias = entry.getKey().getAlias();
-          output.append(String.format("%spredicates on %s: %s\n", 
detailPrefix, alias,
+          output.append(detailPrefix)
+            .append(String.format("predicates on %s: %s\n", alias,
               getExplainString(entry.getValue(), detailLevel)));
         }
       }
@@ -1255,14 +1265,16 @@ public class HdfsScanNode extends ScanNode {
       } else if (extrapolatedNumRows_ == -1) {
         extrapRows = "unavailable";
       }
-      output.append(String.format("%sextrapolated-rows=%s", detailPrefix, 
extrapRows));
+      output.append(detailPrefix)
+        .append(String.format("extrapolated-rows=%s", extrapRows));
       output.append(String.format(" max-scan-range-rows=%s",
           maxScanRangeNumRows_ == -1 ? "unavailable" : maxScanRangeNumRows_));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
-        output.append(String.format("%smissing disk ids: "
+        output.append(detailPrefix)
+          .append(String.format("missing disk ids: "
                 + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n",
-            detailPrefix, numPartitionsNoDiskIds_, numPartitions_, 
numFilesNoDiskIds_,
+            numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_,
             totalFiles_, numScanRangesNoDiskIds_,
             scanRangeSpecs_.getConcrete_rangesSize() + 
generatedScanRangeCount_));
       }
@@ -1283,10 +1295,12 @@ public class HdfsScanNode extends ScanNode {
       TupleDescriptor tupleDesc = entry.getKey();
       List<Expr> exprs = entry.getValue();
       if (tupleDesc == getTupleDesc()) {
-        output.append(String.format("%sparquet statistics predicates: %s\n", 
prefix,
+        output.append(prefix)
+        .append(String.format("parquet statistics predicates: %s\n",
             getExplainString(exprs, detailLevel)));
       } else {
-        output.append(String.format("%sparquet statistics predicates on %s: 
%s\n", prefix,
+        output.append(prefix)
+        .append(String.format("parquet statistics predicates on %s: %s\n",
             tupleDesc.getAlias(), getExplainString(exprs, detailLevel)));
       }
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index 1ecd1c5..afa75e7 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -30,6 +30,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index a329dc2..520cd8f 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -320,6 +320,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
     // Output cardinality, cost estimates and tuple Ids only when explain plan 
level
     // is extended or above.
+    boolean displayCardinality = displayCardinality(detailLevel);
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       // Print resource profile.
       expBuilder.append(detailPrefix);
@@ -334,9 +335,18 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         expBuilder.append(tupleId.asInt() + nullIndicator);
         if (i + 1 != tupleIds_.size()) expBuilder.append(",");
       }
-      expBuilder.append(" row-size=" + 
PrintUtils.printBytes(Math.round(avgRowSize_)));
-      expBuilder.append(PrintUtils.printCardinality(" ", cardinality_));
-      expBuilder.append("\n");
+      expBuilder.append(displayCardinality ? " " : "\n");
+    }
+    // Output cardinality: in standard and above levels.
+    // In standard, on a line by itself (if wanted). In extended, on
+    // a line with tuple ids.
+    if (displayCardinality) {
+      if (detailLevel == TExplainLevel.STANDARD) 
expBuilder.append(detailPrefix);
+      expBuilder.append("row-size=")
+        .append(PrintUtils.printBytes(Math.round(avgRowSize_)))
+        .append(" cardinality=")
+        .append(PrintUtils.printCardinality(cardinality_))
+        .append("\n");
     }
 
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
@@ -353,7 +363,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       }
     }
 
-
     // Print the children. Do not traverse into the children of an Exchange 
node to
     // avoid crossing fragment boundaries.
     if (traverseChildren) {
@@ -386,6 +395,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   }
 
   /**
+   * Per-node setting whether to include cardinality in the node overview.
+   * Some nodes omit cardinality because either a) it is not needed
+   * (Empty set, Exchange), or b) it is printed by the node itself (HDFS scan.)
+   * @return true if cardinality should be included in the generic
+   * node details, false if it should be omitted.
+   */
+  protected boolean displayCardinality(TExplainLevel detailLevel) {
+    return detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal();
+  }
+
+  /**
    * Return the node-specific details.
    * Subclass should override this function.
    * Each line should be prefixed by detailPrefix.

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 53b224e..a31bb50 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1280,7 +1280,9 @@ public class SingleNodePlanner {
     // Do partition pruning before deciding which slots to materialize because 
we might
     // end up removing some predicates.
     HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc);
-    List<? extends FeFsPartition> partitions = 
pruner.prunePartitions(analyzer, conjuncts, false);
+    Pair<List<? extends FeFsPartition>, List<Expr>> pair =
+        pruner.prunePartitions(analyzer, conjuncts, false);
+    List<? extends FeFsPartition> partitions = pair.first;
 
     // Mark all slots referenced by the remaining conjuncts as materialized.
     analyzer.materializeSlots(conjuncts);
@@ -1323,9 +1325,9 @@ public class SingleNodePlanner {
       unionNode.init(analyzer);
       return unionNode;
     } else {
-      ScanNode scanNode =
+      HdfsScanNode scanNode =
           new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, 
partitions,
-              hdfsTblRef, aggInfo);
+              hdfsTblRef, aggInfo, pair.second);
       scanNode.init(analyzer);
       return scanNode;
     }
@@ -1409,7 +1411,6 @@ public class SingleNodePlanner {
     ((HBaseScanNode)scanNode).setKeyRanges(keyRanges);
     scanNode.addConjuncts(conjuncts);
     scanNode.init(analyzer);
-
     return scanNode;
   }
 
@@ -1424,7 +1425,7 @@ public class SingleNodePlanner {
    * - for outer joins: same type of conjuncts as inner joins, but only from 
the
    *   ON or USING clause
    * Predicates that are redundant based on equivalence classes are 
intentionally
-   * returneded by this function because the removal of redundant predicates 
and the
+   * returned by this function because the removal of redundant predicates and 
the
    * creation of new predicates for enforcing slot equivalences go hand-in-hand
    * (see analyzer.createEquivConjuncts()).
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java 
b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index da59860..3bbb903 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -93,6 +93,15 @@ public class CardinalityTest extends PlannerTestBase {
         "SELECT COUNT(*) FROM functional.alltypes GROUP BY bool_col", 2);
   }
 
+  @Test
+  public void testNullColumnJoinCardinality() throws ImpalaException {
+    // IMPALA-7565: Make sure there is no division by zero during cardinality 
calculation
+    // in a many to many join on null columns (ndv = 0).
+    String query = "select * from functional.nulltable t1 "
+        + "inner join [shuffle] functional.nulltable t2 on t1.d = t2.d";
+    checkCardinality(query, 1, 1);
+  }
+
   /**
    * Joins should multiply out cardinalities.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index c8dbf1f..683f702 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -31,6 +31,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.testutil.TestUtils.IgnoreValueFilter;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinDistributionMode;
@@ -75,7 +76,8 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testEmpty() {
-    runPlannerTestFile("empty");
+    runPlannerTestFile("empty",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
@@ -163,29 +165,34 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testJoins() {
-    runPlannerTestFile("joins");
+    runPlannerTestFile("joins",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
   public void testJoinOrder() {
-    runPlannerTestFile("join-order");
+    runPlannerTestFile("join-order",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
   public void testOuterJoins() {
-    runPlannerTestFile("outer-joins");
+    runPlannerTestFile("outer-joins",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
   public void testImplicitJoins() {
-    runPlannerTestFile("implicit-joins");
+    runPlannerTestFile("implicit-joins",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
   public void testFkPkJoinDetection() {
     // The FK/PK detection result is included in EXTENDED or higher.
     runPlannerTestFile("fk-pk-join-detection",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
@@ -278,7 +285,8 @@ public class PlannerTest extends PlannerTestBase {
   public void testTpch() {
     runPlannerTestFile("tpch-all", "tpch",
         ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER,
-            PlannerTestOption.VALIDATE_RESOURCES));
+            PlannerTestOption.VALIDATE_RESOURCES,
+            PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
@@ -298,7 +306,8 @@ public class PlannerTest extends PlannerTestBase {
   public void testTpchNested() {
     runPlannerTestFile("tpch-nested", "tpch_nested_parquet",
         ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER,
-            PlannerTestOption.VALIDATE_RESOURCES));
+            PlannerTestOption.VALIDATE_RESOURCES,
+            PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   @Test
@@ -539,11 +548,16 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
-  public void testDefaultJoinDistributionMode() {
+  public void testDefaultJoinDistributionBroadcastMode() {
     TQueryOptions options = defaultQueryOptions();
     Preconditions.checkState(
         options.getDefault_join_distribution_mode() == 
TJoinDistributionMode.BROADCAST);
     runPlannerTestFile("default-join-distr-mode-broadcast", options);
+  }
+
+  @Test
+  public void testDefaultJoinDistributionShuffleMode() {
+    TQueryOptions options = defaultQueryOptions();
     options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE);
     runPlannerTestFile("default-join-distr-mode-shuffle", options);
   }
@@ -707,12 +721,21 @@ public class PlannerTest extends PlannerTestBase {
         8 * 1024 * 1024);
   }
 
+  /**
+   * Verify that various expected-result filters work on a
+   * variety of sample input lines.
+   */
   @Test
-  public void testNullColumnJoinCardinality() throws ImpalaException {
-    // IMPALA-7565: Make sure there is no division by zero during cardinality 
calculation
-    // in a many to many join on null columns (ndv = 0).
-    String query = "select * from functional.nulltable t1 "
-        + "inner join [shuffle] functional.nulltable t2 on t1.d = t2.d";
-    checkCardinality(query, 1, 1);
+  public void testFilters() {
+    IgnoreValueFilter filter = TestUtils.CARDINALITY_FILTER;
+    assertEquals(" foo=bar cardinality=",
+        filter.transform(" foo=bar cardinality=10"));
+    assertEquals(" foo=bar cardinality=",
+        filter.transform(" foo=bar cardinality=10.3K"));
+    assertEquals(" foo=bar cardinality=",
+        filter.transform(" foo=bar cardinality=unavailable"));
+    filter = TestUtils.ROW_SIZE_FILTER;
+    assertEquals(" row-size= cardinality=10.3K",
+        filter.transform(" row-size=10B cardinality=10.3K"));
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 7db5d37..d9e168f 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -74,8 +74,8 @@ import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduScanToken;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -357,7 +357,8 @@ public class PlannerTestBase extends FrontendTestBase {
 
   private void handleException(String query, String expectedErrorMsg,
       StringBuilder errorLog, StringBuilder actualOutput, Throwable e) {
-    actualOutput.append(e.toString() + "\n");
+    String actualErrorMsg = e.getClass().getSimpleName() + ": " + 
e.getMessage();
+    actualOutput.append(actualErrorMsg).append("\n");
     if (expectedErrorMsg == null) {
       // Exception is unexpected
       errorLog.append(String.format("Query:\n%s\nError Stack:\n%s\n", query,
@@ -365,7 +366,6 @@ public class PlannerTestBase extends FrontendTestBase {
     } else {
       // Compare actual and expected error messages.
       if (expectedErrorMsg != null && !expectedErrorMsg.isEmpty()) {
-        String actualErrorMsg = e.getClass().getSimpleName() + ": " + 
e.getMessage();
         if 
(!actualErrorMsg.toLowerCase().startsWith(expectedErrorMsg.toLowerCase())) {
           errorLog.append("query:\n" + query + "\nExpected error message: '"
               + expectedErrorMsg + "'\nActual error message: '" + 
actualErrorMsg + "'\n");
@@ -535,6 +535,10 @@ public class PlannerTestBase extends FrontendTestBase {
       if (!testOptions.contains(PlannerTestOption.VALIDATE_RESOURCES)) {
         resultFilters.addAll(TestUtils.RESOURCE_FILTERS);
       }
+      if (!testOptions.contains(PlannerTestOption.VALIDATE_CARDINALITY)) {
+        resultFilters.add(TestUtils.ROW_SIZE_FILTER);
+        resultFilters.add(TestUtils.CARDINALITY_FILTER);
+      }
       String planDiff = TestUtils.compareOutput(
           Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, 
resultFilters);
       if (!planDiff.isEmpty()) {
@@ -804,6 +808,11 @@ public class PlannerTestBase extends FrontendTestBase {
     // ignore differences in resource values). Operator- and fragment-level 
resource
     // requirements are only included if EXTENDED_EXPLAIN is also enabled.
     VALIDATE_RESOURCES,
+    // Verify the row size and cardinality fields in the plan. Default is
+    // to ignore these values (for backward compatibility.) Turn this option
+    // on for test that validate cardinality calculations: joins, scan
+    // cardinality, etc.
+    VALIDATE_CARDINALITY
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java 
b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 5e678d2..26412fb 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -16,6 +16,7 @@
 // under the License.
 
 package org.apache.impala.testutil;
+
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
@@ -80,8 +81,10 @@ public class TestUtils {
 
     public PathFilter(String prefix) { filterKey_ = prefix; }
 
+    @Override
     public boolean matches(String input) { return input.contains(filterKey_); }
 
+    @Override
     public String transform(String input) {
       String result = input.replaceFirst(filterKey_, "");
       result = result.replaceAll(PATH_FILTER, " ");
@@ -109,8 +112,10 @@ public class TestUtils {
       this.valueRegex = valueRegex;
     }
 
+    @Override
     public boolean matches(String input) { return input.contains(keyPrefix); }
 
+    @Override
     public String transform(String input) {
       return input.replaceAll(keyPrefix + valueRegex, keyPrefix);
     }
@@ -121,6 +126,15 @@ public class TestUtils {
   public static final IgnoreValueFilter FILE_SIZE_FILTER =
       new IgnoreValueFilter("size", BYTE_VALUE_REGEX);
 
+  // Ignore the row-size=8B entries
+  public static final IgnoreValueFilter ROW_SIZE_FILTER =
+      new IgnoreValueFilter("row-size", "\\S+");
+
+  // Ignore cardinality=27.30K or cardinality=unavailable
+  // entries
+  public static final IgnoreValueFilter CARDINALITY_FILTER =
+      new IgnoreValueFilter("cardinality", "\\S+");
+
   // Ignore the exact estimated row count, which depends on the file sizes.
   static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER =
       new IgnoreValueFilter("max-scan-range-rows", NUMBER_REGEX);
@@ -160,6 +174,7 @@ public class TestUtils {
     }
     int mismatch = -1; // line in actual w/ mismatch
     int maxLen = Math.min(actual.size(), expected.size());
+    outer:
     for (int i = 0; i < maxLen; ++i) {
       String expectedStr = expected.get(i).trim();
       String actualStr = actual.get(i);
@@ -192,33 +207,31 @@ public class TestUtils {
       }
 
       // do a whitespace-insensitive comparison
-      Scanner e = new Scanner(expectedStr);
-      Scanner a = new Scanner(actualStr);
-      while (a.hasNext() && e.hasNext()) {
-        if (containsPrefix) {
-          if (!a.next().contains(e.next())) {
-            mismatch = i;
-            break;
-          }
-        } else {
-          if (!a.next().equals(e.next())) {
+      try (Scanner e = new Scanner(expectedStr);
+           Scanner a = new Scanner(actualStr)) {
+        while (a.hasNext() && e.hasNext()) {
+          String aToken = a.next();
+          String eToken = e.next();
+          if (containsPrefix) {
+            if (!aToken.contains(eToken)) {
+              mismatch = i;
+              break outer;
+            }
+          } else if (!aToken.equals(eToken)) {
             mismatch = i;
-            break;
+            break outer;
           }
         }
-      }
-      if (mismatch != -1) {
-        break;
-      }
 
-      if (ignoreAfter) {
-        if (e.hasNext() && !a.hasNext()) {
+        if (ignoreAfter) {
+          if (e.hasNext() && !a.hasNext()) {
+            mismatch = i;
+            break outer;
+          }
+        } else if (a.hasNext() != e.hasNext()) {
           mismatch = i;
-          break;
+          break outer;
         }
-      } else if (a.hasNext() != e.hasNext()) {
-        mismatch = i;
-        break;
       }
     }
     if (mismatch == -1 && actual.size() < expected.size()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a7ea86b7/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index cf5a78b..18230ab 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -7,22 +7,27 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), 
sum(tinyint_col), avg(tinyint_col)
+|  row-size=34B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=1B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*), count:merge(tinyint_col), min:merge(tinyint_col), 
max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col)
+|  row-size=34B cardinality=1
 |
 02:EXCHANGE [UNPARTITIONED]
 |
 01:AGGREGATE
 |  output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), 
sum(tinyint_col), avg(tinyint_col)
+|  row-size=34B cardinality=1
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=1B cardinality=11.00K
 ====
 # with grouping
 select tinyint_col, bigint_col, count(*), min(tinyint_col), max(tinyint_col), 
sum(tinyint_col),
@@ -35,9 +40,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), 
avg(tinyint_col)
 |  group by: bigint_col, tinyint_col
+|  row-size=35B cardinality=9.07K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=9B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -46,15 +53,18 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*), min:merge(tinyint_col), max:merge(tinyint_col), 
sum:merge(tinyint_col), avg:merge(tinyint_col)
 |  group by: bigint_col, tinyint_col
+|  row-size=35B cardinality=9.07K
 |
 02:EXCHANGE [HASH(bigint_col,tinyint_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), 
avg(tinyint_col)
 |  group by: bigint_col, tinyint_col
+|  row-size=35B cardinality=9.07K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=9B cardinality=11.00K
 ====
 # avg substitution
 select avg(id)
@@ -66,30 +76,37 @@ PLAN-ROOT SINK
 |
 02:TOP-N [LIMIT=10]
 |  order by: avg(zip) ASC
+|  row-size=16B cardinality=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: avg(id), count(id), avg(zip)
 |  having: count(id) > 0
+|  row-size=24B cardinality=0
 |
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
+   row-size=12B cardinality=0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 02:TOP-N [LIMIT=10]
 |  order by: avg(zip) ASC
+|  row-size=16B cardinality=0
 |
 04:AGGREGATE [FINALIZE]
 |  output: avg:merge(id), count:merge(id), avg:merge(zip)
 |  having: count(id) > 0
+|  row-size=24B cardinality=0
 |
 03:EXCHANGE [UNPARTITIONED]
 |
 01:AGGREGATE
 |  output: avg(id), count(id), avg(zip)
+|  row-size=24B cardinality=0
 |
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
+   row-size=12B cardinality=0
 ====
 # Test correct removal of redundant group-by expressions (IMPALA-817)
 select int_col + int_col, int_col * int_col
@@ -103,9 +120,11 @@ PLAN-ROOT SINK
 |  group by: int_col + int_col, int_col * int_col
 |  having: int_col * int_col < 0
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=4B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -116,14 +135,17 @@ PLAN-ROOT SINK
 |  group by: int_col + int_col, int_col * int_col
 |  having: int_col * int_col < 0
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 02:EXCHANGE [HASH(int_col + int_col,int_col * int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: int_col + int_col, int_col * int_col
+|  row-size=16B cardinality=11.00K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=4B cardinality=11.00K
 ====
 # Tests that a having predicate triggers slot materialization (IMPALA-846).
 select count(*) from
@@ -138,17 +160,21 @@ PLAN-ROOT SINK
 |  output: count(*), count(t2.int_col), count(t1.bigint_col)
 |  group by: t1.tinyint_col, t2.smallint_col
 |  having: count(t2.int_col) = count(t1.bigint_col)
+|  row-size=27B cardinality=2
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.smallint_col = t2.smallint_col
 |  runtime filters: RF000 <- t2.smallint_col
+|  row-size=17B cardinality=5.84K
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
+|     row-size=6B cardinality=8
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> t1.smallint_col
+   row-size=11B cardinality=7.30K
 ====
 # Tests proper slot materialization of agg-tuple slots for avg (IMP-1271).
 # 't.x > 10' is picked up as an unassigned conjunct, and not as a binding
@@ -165,9 +191,11 @@ PLAN-ROOT SINK
 |  output: avg(bigint_col)
 |  group by: int_col
 |  having: avg(bigint_col) > 10
+|  row-size=12B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=12B cardinality=7.30K
 ====
 # test distributed aggregation over unions (IMPALA-831)
 # non-distinct agg without grouping over a union
@@ -182,35 +210,44 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=0B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=0B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 05:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 04:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=0B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=0B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ====
 # non-distinct agg with grouping over a union
 select count(*) from
@@ -226,15 +263,19 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  group by: bigint_col
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=8B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -245,21 +286,26 @@ PLAN-ROOT SINK
 |  output: count:merge(*)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 04:EXCHANGE [HASH(t.bigint_col)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: bigint_col
+|  row-size=16B cardinality=20
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=8B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=8B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=8B cardinality=7.30K
 ====
 # distinct agg without grouping over a union
 select count(distinct int_col)
@@ -274,46 +320,58 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 03:AGGREGATE
 |  group by: int_col
+|  row-size=4B cardinality=20
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=4B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=4B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(int_col)
+|  row-size=8B cardinality=1
 |
 06:AGGREGATE
 |  group by: int_col
+|  row-size=4B cardinality=20
 |
 05:EXCHANGE [HASH(int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: int_col
+|  row-size=4B cardinality=20
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=4B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=4B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # distinct agg with grouping over a union
 select count(distinct int_col)
@@ -330,18 +388,23 @@ PLAN-ROOT SINK
 |  output: count(int_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 03:AGGREGATE
 |  group by: bigint_col, int_col
+|  row-size=12B cardinality=400
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=12B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=12B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=12B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -352,29 +415,36 @@ PLAN-ROOT SINK
 |  output: count:merge(int_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=16B cardinality=10
 |
 07:EXCHANGE [HASH(t.bigint_col)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: t.bigint_col
+|  row-size=16B cardinality=10
 |
 06:AGGREGATE
 |  group by: t.bigint_col, int_col
+|  row-size=12B cardinality=400
 |
 05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: bigint_col, int_col
+|  row-size=12B cardinality=400
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=12B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=12B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=12B cardinality=7.30K
 ====
 # mixed distinct and non-distinct agg without grouping over a union
 select count(smallint_col), count(distinct int_col)
@@ -389,49 +459,61 @@ PLAN-ROOT SINK
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col), count:merge(smallint_col)
 |  limit: 10
+|  row-size=16B cardinality=1
 |
 03:AGGREGATE
 |  output: count(smallint_col)
 |  group by: int_col
+|  row-size=12B cardinality=20
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=6B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=6B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=6B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col), count:merge(smallint_col)
 |  limit: 10
+|  row-size=16B cardinality=1
 |
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(int_col), count:merge(smallint_col)
+|  row-size=16B cardinality=1
 |
 06:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: int_col
+|  row-size=12B cardinality=20
 |
 05:EXCHANGE [HASH(int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
 |  group by: int_col
+|  row-size=12B cardinality=20
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=6B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=6B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=6B cardinality=7.30K
 ====
 # mixed distinct and non-distinct agg with grouping over a union
 select count(smallint_col), count(distinct int_col)
@@ -448,19 +530,24 @@ PLAN-ROOT SINK
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=24B cardinality=10
 |
 03:AGGREGATE
 |  output: count(smallint_col)
 |  group by: bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=14B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=14B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=14B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -471,31 +558,38 @@ PLAN-ROOT SINK
 |  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=24B cardinality=10
 |
 07:EXCHANGE [HASH(t.bigint_col)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
+|  row-size=24B cardinality=10
 |
 06:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
 |  group by: bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=14B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=14B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=14B cardinality=7.30K
 ====
 # mixed distinct and non-distinct agg with grouping over a union distinct
 select count(smallint_col), count(distinct int_col)
@@ -512,22 +606,28 @@ PLAN-ROOT SINK
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=24B cardinality=10
 |
 04:AGGREGATE
 |  output: count(smallint_col)
 |  group by: bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 03:AGGREGATE [FINALIZE]
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  row-size=89B cardinality=7.40K
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=89B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=89B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -538,39 +638,48 @@ PLAN-ROOT SINK
 |  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
+|  row-size=24B cardinality=10
 |
 10:EXCHANGE [HASH(t.bigint_col)]
 |
 05:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
+|  row-size=24B cardinality=10
 |
 09:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 08:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
 |  group by: bigint_col, int_col
+|  row-size=20B cardinality=400
 |
 07:AGGREGATE [FINALIZE]
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  row-size=89B cardinality=7.40K
 |
 06:EXCHANGE 
[HASH(id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col,year,month)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  row-size=89B cardinality=7.40K
 |
 00:UNION
 |  pass-through-operands: all
+|  row-size=89B cardinality=7.40K
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=89B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=89B cardinality=7.30K
 ====
 # Mixed distinct and non-distinct agg with intermediate type different from 
input type
 # Regression test for IMPALA-5251 to exercise validateMergeAggFn() in 
FunctionCallExpr.
@@ -581,36 +690,44 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|  row-size=24B cardinality=1
 |
 01:AGGREGATE
 |  output: avg(l_quantity), ndv(l_discount)
 |  group by: l_partkey
+|  row-size=24B cardinality=200.52K
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.92MB
+   partitions=1/1 files=3 size=193.60MB
+   row-size=24B cardinality=6.00M
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|  row-size=24B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|  row-size=24B cardinality=1
 |
 04:AGGREGATE
 |  output: avg:merge(l_quantity), ndv:merge(l_discount)
 |  group by: l_partkey
+|  row-size=24B cardinality=200.52K
 |
 03:EXCHANGE [HASH(l_partkey)]
 |
 01:AGGREGATE [STREAMING]
 |  output: avg(l_quantity), ndv(l_discount)
 |  group by: l_partkey
+|  row-size=24B cardinality=200.52K
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.92MB
+   partitions=1/1 files=3 size=193.60MB
+   row-size=24B cardinality=6.00M
 ====
 # test that aggregations are not placed below an unpartitioned exchange with a 
limit
 select count(*) from (select * from functional.alltypes limit 10) t
@@ -619,15 +736,18 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    limit: 10
+   row-size=0B cardinality=10
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 02:EXCHANGE [UNPARTITIONED]
 |  limit: 10
@@ -635,6 +755,7 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    limit: 10
+   row-size=0B cardinality=10
 ====
 # test that aggregations are not placed below an unpartitioned exchange with a 
limit
 select count(*) from
@@ -646,21 +767,26 @@ PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 00:UNION
 |  pass-through-operands: all
 |  limit: 10
+|  row-size=0B cardinality=10
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=0B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
+|  row-size=8B cardinality=1
 |
 04:EXCHANGE [UNPARTITIONED]
 |  limit: 10
@@ -668,12 +794,15 @@ PLAN-ROOT SINK
 00:UNION
 |  pass-through-operands: all
 |  limit: 10
+|  row-size=0B cardinality=10
 |
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=0B cardinality=100
 |
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+   row-size=0B cardinality=7.30K
 ====
 # test that limits are applied at the top-level merge aggregation node for 
non-grouping
 # distinct aggregation (IMPALA-1802)
@@ -690,66 +819,82 @@ PLAN-ROOT SINK
 06:AGGREGATE [FINALIZE]
 |  output: count(cnt)
 |  limit: 1
+|  row-size=8B cardinality=1
 |
 05:AGGREGATE
 |  group by: count(t1.id)
+|  row-size=8B cardinality=1
 |
 04:AGGREGATE [FINALIZE]
 |  output: count(t1.id)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 03:AGGREGATE
 |  group by: t1.id
+|  row-size=4B cardinality=9
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=8B cardinality=9
 |
 |--01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id
+   row-size=4B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: count(cnt)
 |  limit: 1
+|  row-size=8B cardinality=1
 |
 05:AGGREGATE
 |  group by: count(t1.id)
+|  row-size=8B cardinality=1
 |
 11:AGGREGATE [FINALIZE]
 |  output: count:merge(t1.id)
 |  limit: 10
+|  row-size=8B cardinality=1
 |
 10:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(t1.id)
+|  row-size=8B cardinality=1
 |
 09:AGGREGATE
 |  group by: t1.id
+|  row-size=4B cardinality=9
 |
 08:EXCHANGE [HASH(t1.id)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: t1.id
+|  row-size=4B cardinality=9
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: t1.id = t2.id
 |  runtime filters: RF000 <- t2.id
+|  row-size=8B cardinality=9
 |
 |--07:EXCHANGE [BROADCAST]
 |  |
 |  01:SCAN HDFS [functional.alltypestiny t2]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id
+   row-size=4B cardinality=11.00K
 ====
 # IMPALA-2089: Tests correct elimination of redundant predicates.
 # The equivalences between inline-view slots are enforced inside the 
inline-view plan.
@@ -768,10 +913,12 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  group by: tinyint_col, smallint_col, int_col + int_col, 
coalesce(bigint_col, year)
 |  having: int_col + int_col = coalesce(bigint_col, year), smallint_col = 
int_col + int_col
+|  row-size=19B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    predicates: functional.alltypes.tinyint_col = 
functional.alltypes.smallint_col
+   row-size=19B cardinality=730
 ====
 # IMPALA-1917: Test NULL literals inside inline view with grouping aggregation.
 select cnt from
@@ -784,9 +931,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: bool_col, CAST(NULL AS INT)
+|  row-size=13B cardinality=2
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=1B cardinality=8
 ====
 # IMPALA-1917: Test NULL literals inside inline view with grouping aggregation.
 select cnt from
@@ -799,12 +948,15 @@ PLAN-ROOT SINK
 02:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: bool_col, NULL
+|  row-size=10B cardinality=2
 |
 01:AGGREGATE
 |  group by: bool_col, NULL, int_col
+|  row-size=6B cardinality=4
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=5B cardinality=8
 ====
 # test simple group_concat with distinct
 select group_concat(distinct string_col) from functional.alltypesagg
@@ -813,33 +965,41 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
+|  row-size=12B cardinality=1
 |
 01:AGGREGATE
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col)
+|  row-size=12B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: group_concat(string_col)
+|  row-size=12B cardinality=1
 |
 04:AGGREGATE
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 03:EXCHANGE [HASH(string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ====
 # test group_concat and a group by
 select day, group_concat(distinct string_col)
@@ -851,25 +1011,31 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
 |  group by: day
+|  row-size=16B cardinality=11
 |
 02:AGGREGATE
 |  group by: day, string_col
+|  row-size=19B cardinality=1.10K
 |
 01:TOP-N [LIMIT=99999]
 |  order by: id ASC
+|  row-size=23B cardinality=1.10K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
    predicates: day = id % 100
+   row-size=23B cardinality=1.10K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
 |  group by: day
+|  row-size=16B cardinality=11
 |
 02:AGGREGATE
 |  group by: day, string_col
+|  row-size=19B cardinality=1.10K
 |
 04:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: id ASC
@@ -877,10 +1043,12 @@ PLAN-ROOT SINK
 |
 01:TOP-N [LIMIT=99999]
 |  order by: id ASC
+|  row-size=23B cardinality=1.10K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
    predicates: day = id % 100
+   row-size=23B cardinality=1.10K
 ====
 # test group_concat with distinct together with another distinct aggregate 
function
 select count(distinct cast(timestamp_col as string)),
@@ -892,12 +1060,15 @@ PLAN-ROOT SINK
 02:AGGREGATE [FINALIZE]
 |  output: count(CAST(timestamp_col AS STRING)), 
group_concat(CAST(timestamp_col AS STRING))
 |  group by: year
+|  row-size=24B cardinality=1
 |
 01:AGGREGATE
 |  group by: year, CAST(timestamp_col AS STRING)
+|  row-size=20B cardinality=10.21K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=20B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -906,23 +1077,28 @@ PLAN-ROOT SINK
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(CAST(timestamp_col AS STRING)), 
group_concat:merge(CAST(timestamp_col AS STRING))
 |  group by: year
+|  row-size=24B cardinality=1
 |
 05:EXCHANGE [HASH(year)]
 |
 02:AGGREGATE [STREAMING]
 |  output: count(CAST(timestamp_col AS STRING)), 
group_concat(CAST(timestamp_col AS STRING))
 |  group by: year
+|  row-size=24B cardinality=1
 |
 04:AGGREGATE
 |  group by: year, CAST(timestamp_col AS STRING)
+|  row-size=20B cardinality=10.21K
 |
 03:EXCHANGE [HASH(year,CAST(timestamp_col AS STRING))]
 |
 01:AGGREGATE [STREAMING]
 |  group by: year, CAST(timestamp_col AS STRING)
+|  row-size=20B cardinality=10.21K
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=20B cardinality=11.00K
 ====
 # test group_concat distinct with other non-distinct aggregate functions
  select group_concat(distinct string_col), count(*) from functional.alltypesagg
@@ -931,36 +1107,44 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col), count:merge(*)
+|  row-size=20B cardinality=1
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col), count:merge(*)
+|  row-size=20B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: group_concat(string_col), count:merge(*)
+|  row-size=20B cardinality=1
 |
 04:AGGREGATE
 |  output: count:merge(*)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 03:EXCHANGE [HASH(string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ====
 # test group_concat distinct with other aggregate functions, with custom 
separator
 select group_concat(distinct string_col, '-'), sum(int_col), count(distinct 
string_col)
@@ -970,36 +1154,44 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col)
+|  row-size=28B cardinality=1
 |
 01:AGGREGATE
 |  output: sum(int_col)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=19B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col, '-'), count:merge(string_col), 
sum:merge(int_col)
+|  row-size=28B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col)
+|  row-size=28B cardinality=1
 |
 04:AGGREGATE
 |  output: sum:merge(int_col)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 03:EXCHANGE [HASH(string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: sum(int_col)
 |  group by: string_col
+|  row-size=23B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=19B cardinality=11.00K
 ====
 # test group_concat distinct with other aggregate functions, with custom 
separator
 # and a group by
@@ -1012,13 +1204,16 @@ PLAN-ROOT SINK
 02:AGGREGATE [FINALIZE]
 |  output: count(date_string_col), group_concat(date_string_col, '-'), 
count:merge(*)
 |  group by: month, year
+|  row-size=36B cardinality=1
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: month, year, date_string_col
+|  row-size=36B cardinality=10
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=28B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -1027,25 +1222,30 @@ PLAN-ROOT SINK
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(date_string_col), group_concat:merge(date_string_col, 
'-'), count:merge(*)
 |  group by: month, year
+|  row-size=36B cardinality=1
 |
 05:EXCHANGE [HASH(month,year)]
 |
 02:AGGREGATE [STREAMING]
 |  output: count(date_string_col), group_concat(date_string_col, '-'), 
count:merge(*)
 |  group by: month, year
+|  row-size=36B cardinality=1
 |
 04:AGGREGATE
 |  output: count:merge(*)
 |  group by: month, year, date_string_col
+|  row-size=36B cardinality=10
 |
 03:EXCHANGE [HASH(month,year,date_string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: month, year, date_string_col
+|  row-size=36B cardinality=10
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=28B cardinality=11.00K
 ====
 # test multiple group_concat distinct, each with a different separator
 select group_concat(distinct string_col), group_concat(distinct string_col, 
'-'),
@@ -1055,33 +1255,41 @@ PLAN-ROOT SINK
 |
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col), group_concat(string_col, '-'), 
group_concat(string_col, '---')
+|  row-size=36B cardinality=1
 |
 01:AGGREGATE
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col), group_concat:merge(string_col, 
'-'), group_concat:merge(string_col, '---')
+|  row-size=36B cardinality=1
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
 |  output: group_concat(string_col), group_concat(string_col, '-'), 
group_concat(string_col, '---')
+|  row-size=36B cardinality=1
 |
 04:AGGREGATE
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 03:EXCHANGE [HASH(string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: string_col
+|  row-size=15B cardinality=963
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
+   row-size=15B cardinality=11.00K
 ====
 # IMPALA-852: Aggregation only in the HAVING clause.
 select 1 from functional.alltypestiny having count(*) > 0
@@ -1091,9 +1299,11 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  having: count(*) > 0
+|  row-size=8B cardinality=0
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
+   row-size=0B cardinality=8
 ====
 # Grouping aggregation where input is partitioned on grouping expr.
 # Planner should not redundantly repartition the data that was already 
partitioned on
@@ -1115,22 +1325,26 @@ PLAN-ROOT SINK
 |  group by: c_custkey
 |  having: count(*) < 150000
 |  limit: 1000000
+|  row-size=16B cardinality=15.00K
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: c_custkey = o_custkey
 |  runtime filters: RF000 <- o_custkey
+|  row-size=18B cardinality=91.47K
 |
 |--05:EXCHANGE [HASH(o_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.20MB
+|     partitions=1/1 files=2 size=54.07MB
+|     row-size=8B cardinality=1.50M
 |
 04:EXCHANGE [HASH(c_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.customer]
-   partitions=1/1 files=1 size=12.34MB
+   partitions=1/1 files=1 size=12.31MB
    predicates: c_nationkey = 16
    runtime filters: RF000 -> c_custkey
+   row-size=10B cardinality=6.00K
 ====
 # Distinct aggregation where input is partitioned on distinct expr.
 # Planner should not redundantly repartition the data that was already 
partitioned on
@@ -1147,29 +1361,35 @@ PLAN-ROOT SINK
 |  output: count:merge(c_custkey)
 |  having: count(c_custkey) > 50
 |  limit: 50
+|  row-size=8B cardinality=0
 |
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE
 |  output: count(c_custkey)
+|  row-size=8B cardinality=0
 |
 03:AGGREGATE
 |  group by: c_custkey
+|  row-size=8B cardinality=150.00K
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF000 <- c_custkey
+|  row-size=16B cardinality=1.50M
 |
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.34MB
+|     partitions=1/1 files=1 size=12.31MB
+|     row-size=8B cardinality=150.00K
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.20MB
+   partitions=1/1 files=2 size=54.07MB
    runtime filters: RF000 -> o_custkey
+   row-size=8B cardinality=1.50M
 ====
 # Distinct grouping aggregation where input is partitioned on distinct and 
grouping exprs.
 # Planner should not redundantly repartition the data that was already 
partitioned on
@@ -1185,30 +1405,36 @@ PLAN-ROOT SINK
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(c_custkey)
 |  group by: c_custkey
+|  row-size=16B cardinality=150.00K
 |
 07:EXCHANGE [HASH(c_custkey)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(c_custkey)
 |  group by: c_custkey
+|  row-size=16B cardinality=150.00K
 |
 03:AGGREGATE
 |  group by: c_custkey, c_custkey
+|  row-size=16B cardinality=1.50M
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_custkey = c_custkey
 |  runtime filters: RF000 <- c_custkey
+|  row-size=16B cardinality=1.50M
 |
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.34MB
+|     partitions=1/1 files=1 size=12.31MB
+|     row-size=8B cardinality=150.00K
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.20MB
+   partitions=1/1 files=2 size=54.07MB
    runtime filters: RF000 -> o_custkey
+   row-size=8B cardinality=1.50M
 ====
 # Complex aggregation when two joins and an agg end up in same fragment.
 select l_orderkey, l_returnflag, count(*) from (
@@ -1233,31 +1459,37 @@ PLAN-ROOT SINK
 |  group by: tpch_parquet.lineitem.l_orderkey, 
tpch_parquet.lineitem.l_returnflag
 |  having: count(*) > 10
 |  limit: 10
+|  row-size=29B cardinality=10
 |
 04:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: o_custkey = c_custkey, o_comment = c_phone
 |  runtime filters: RF000 <- c_custkey, RF001 <- c_phone
+|  row-size=160B cardinality=607.19K
 |
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.34MB
+|     partitions=1/1 files=1 size=12.31MB
+|     row-size=35B cardinality=150.00K
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk
 |  runtime filters: RF004 <- o_orderkey, RF005 <- o_clerk
+|  row-size=125B cardinality=5.76M
 |
 |--07:EXCHANGE [HASH(o_orderkey,o_clerk)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.20MB
+|     partitions=1/1 files=2 size=54.07MB
 |     runtime filters: RF000 -> o_custkey, RF001 -> o_comment
+|     row-size=104B cardinality=1.50M
 |
 06:EXCHANGE [HASH(l_orderkey,l_returnflag)]
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.92MB
+   partitions=1/1 files=3 size=193.60MB
    runtime filters: RF004 -> l_orderkey, RF005 -> l_returnflag
+   row-size=21B cardinality=6.00M
 ====
 # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs 
reference a
 # tuple that is made nullable in the join fragment.
@@ -1274,25 +1506,30 @@ PLAN-ROOT SINK
 07:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: t2.id
+|  row-size=12B cardinality=99
 |
 06:EXCHANGE [HASH(t2.id)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: t2.id
+|  row-size=12B cardinality=99
 |
 02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
 |  hash predicates: t1.id = t2.id
+|  row-size=8B cardinality=7.30K
 |
 |--05:EXCHANGE [HASH(t2.id)]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=4B cardinality=100
 |
 04:EXCHANGE [HASH(t1.id)]
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # IMPALA-4263: Grouping agg is placed in the join fragment and has no merge 
step.
 select /* +straight_join */ t1.id, count(*)
@@ -1308,19 +1545,23 @@ PLAN-ROOT SINK
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: t1.id
+|  row-size=12B cardinality=7.30K
 |
 02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
 |  hash predicates: t1.id = t2.id
+|  row-size=8B cardinality=7.30K
 |
 |--05:EXCHANGE [HASH(t2.id)]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=4B cardinality=100
 |
 04:EXCHANGE [HASH(t1.id)]
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====
 # IMPALA-4263: Grouping agg is placed in the second join fragment and has no 
merge step.
 # The grouping exprs reference a nullable tuple (t2), but that tuple is made 
nullable in
@@ -1341,27 +1582,33 @@ PLAN-ROOT SINK
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: t2.id
+|  row-size=12B cardinality=99
 |
 04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
 |  hash predicates: t2.id = t3.id
+|  row-size=16B cardinality=73.00K
 |
 |--09:EXCHANGE [HASH(t3.id)]
 |  |
 |  02:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
+|     row-size=4B cardinality=8
 |
 08:EXCHANGE [HASH(t2.id)]
 |
 03:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]
 |  hash predicates: t1.int_col = t2.int_col
+|  row-size=12B cardinality=73.00K
 |
 |--07:EXCHANGE [HASH(t2.int_col)]
 |  |
 |  01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
+|     row-size=8B cardinality=100
 |
 06:EXCHANGE [HASH(t1.int_col)]
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB
+   row-size=4B cardinality=7.30K
 ====

Reply via email to