Riza Suminto has uploaded a new patch set (#30) to the change originally 
created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 )

Change subject: IMPALA-11604 Planner changes for CPU usage
......................................................................

IMPALA-11604 Planner changes for CPU usage

This patch augments IMPALA-10992 by establishing an infrastructure
to allow weighted total amount of data to process to be used as a
new factor in the definition and selection of an executor group. In
this patch the weight component is set to 1.

Two flavors of the weighted amount of data processed are enabled in
this patch: the minimal amount and the maximal amount. The former is
the processing cost along a path with the largest sum of processing
costs, reflecting the best level of parallelism among fragments.
The latter is the sum of that of every fragment, reflecting the worst
level of parallelism. The geometric mean of the minimal and the
maximal is used as the estimated processing cost of the query.

A fragment's total amount of data processed is the sum of that of
every node in the fragment. The weighted amount of data processed
is computed with a general formula as follows.

  Processing cost is a pair: PC(D, N), where D = I * C * W

  where D is the weighted amount of data processed
        N is number of instances
        I is input cardinality
        C is expression evaluation cost per row, set to 1
        W is average row size

A description of the computation for each kind of plan node is
given below.

1. Aggregation node:
    C and W are the sum of the costs and partial row widths for each
    AggregateInfo object.

2. AnalyticEval node:
    C is sum of the evaluation costs for analytic functions, partition
    by equal and order by equal predicate;

3. CardinalityCheck node:
    Both C and I are 1;

4. DataSource scan node:
    C is computed from a subset of the selection predicates excluding
    data source accepted predicates;

5. EmptySet node:
    I is 0;

6. Exchange node:
    A modification of the general formula when in broadcast mode:
    D = D * number of receivers;

7. Hash join node:
    probe side = PC(I0 * C(equi-join predicate) * W,  N)  +
            PC(output cardinality * C(other join predicate) * W, N)

    build side = PC(I1 * C(equi-join predicate) * W, N)

8. Hbase scan node:
    N is 1

9. Hdfs and Kudu scan node:
    N is mt_dop when query option mt_dop >= 1, otherwise
    N is number of nodes * max scan threads;

10. Nested loop join node:
    When the right child is not a SingularRowSrc node:

      probe side = PC(I0 * C(equi-join predicate) * W, N)  +
              PC(output cardinality * C(other join predicate) * W, N)
      build side = PC(I1 * C(equi-join predicate) * W, N)

    When the right child is a SingularRowSrc node:

      probe side = PC(I0 * W, N)
      build side = PC(I0 * I1 * W, N)

11. Select node:
    Use the general formula;

12. SingularRowSrc node:
    Since the node is involved once per input in nested loop join, the
    contribution of this node is computed in nested loop join;

13. Sort node:
    C is the evalation cost for the sort expression and W is the width
    of the intermediate tuple being sorted;

14. Subplan node:
    C is 1. I is the multiplication of the cardinality of the left and
    the right child;

15. Union node:
    C is the cost of materializing rows from all non pass-through
    children. W is the width of all non pass-through children;

16. Unnest node:
    I is the cardinality of the containing subplan node and C is 1.

The processing cost for the data sink of a fragment is also computed
and added to that of the fragment.

This patch also assumes that the number of instances of execution
overlaps each other when there is a discrepancy of instances among
nodes in a single fragment. For example when there are 6 scan thread
and 3 aggregation threads in a single fragment, 3 threads are used
for both scan and aggregate, and 3 other threads are used for the
scan.

As an example, the best and worst processing cost for TPCDS large
query q14a and tiny q19 are as follows.
Processing cost for query:

q14a:
  Best case: total=1271804127, numInstances=9, perInstance=141311569
  Worst case: total=5169193752, numInstances=12, perInstance=430766146

q19:
  Best case: total=1082950, numInstances=15, perInstance=72196
  Worst case: total=1082950, numInstances=15, perInstance=72196

Testing:
  1. Unit testing by examining the best and the worst processing cost
     computed for all plan nodes in all fragments for a small set of
     queries;
  2. Core tests.

Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
---
M common/thrift/Frontend.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
M fe/src/main/java/org/apache/impala/analysis/Expr.java
M fe/src/main/java/org/apache/impala/analysis/SortInfo.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
M fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
M fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
M fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/JoinNode.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
A fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
M fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/SelectNode.java
M fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
M fe/src/main/java/org/apache/impala/planner/SortNode.java
M fe/src/main/java/org/apache/impala/planner/SubplanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/planner/UnnestNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
A fe/src/main/java/org/apache/impala/util/AutoScaleUtil.java
M fe/src/main/java/org/apache/impala/util/ExprUtil.java
M fe/src/main/java/org/apache/impala/util/MathUtil.java
38 files changed, 743 insertions(+), 59 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/33/19033/30
--
To view, visit http://gerrit.cloudera.org:8080/19033
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
Gerrit-Change-Number: 19033
Gerrit-PatchSet: 30
Gerrit-Owner: Qifan Chen <qfc...@hotmail.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com>
Gerrit-Reviewer: Qifan Chen <qfc...@hotmail.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

Reply via email to