Riza Suminto has uploaded a new patch set (#49) to the change originally 
created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 )

Change subject: IMPALA-11604 Planner changes for CPU usage
......................................................................

IMPALA-11604 Planner changes for CPU usage

This patch augments IMPALA-10992 by establishing an infrastructure to
allow the weighted total amount of data to process to be used as a new
factor in the definition and selection of an executor group. At the
basis of the CPU costing model, we define ProcessingCost as a cost for a
distinct PlanNode / DataSink / PlanFragment to process its input rows
globally across all of its instances. The costing algorithm then tries
to adjust the number of instances for each fragment by considering their
production-consumption ratio, and then finally returns a number
representing an ideal CPU core count required for a query to run
efficiently. A more detailed explanation of the CPU costing algorithm
can be found in the four steps below.

I. Compute ProcessingCost for each plan node and data sink.

ProcessingCost of a PlanNode/DataSink is a weighted amount of data
processed by that node/sink. The basic ProcessingCost is computed with a
general formula as follows.

  ProcessingCost is a pair: PC(D, N), where D = I * (C + M)

  where D is the weighted amount of data processed
        I is the input cardinality
        C is the expression evaluation cost per row.
          Set to total weight of expression evaluation in node/sink.
        M is a materialization cost per row.
          Only used by scan and exchange node. Otherwise, 0.
        N is the number of instances.
          Default to D / MIN_COST_PER_THREAD (1 million), but
          is fixed for a certain node/sink and adjustable in step III.

In this patch, the weight of each expression evaluation is set to a
constant of 1. A description of the computation for each kind of
PlanNode/DataSink is given below.

01. AggregationNode:
    Each AggregateInfo has its C as a sum of grouping expression and
    aggregate expression and then assigned a single ProcessingCost
    individually. These ProcessingCosts then summed to be the Aggregation
    node's ProcessingCost;

02. AnalyticEvalNode:
    C is the sum of the evaluation costs for analytic functions;

03. CardinalityCheckNode:
    Use the general formula, I = 1;

04. DataSourceScanNode:
    Follow the formula from the superclass ScanNode;

05. EmptySetNode:
      I = 0;

06. ExchangeNode:
      M = (average serialized row size) / 1024

    A modification of the general formula when in broadcast mode:
      D = D * number of receivers;

07. HashJoinNode:
      probe cost = PC(I0 * C(equiJoin predicate),  N)  +
                   PC(output cardinality * C(otherJoin predicate), N)
      build cost = PC(I1 * C(equi-join predicate), N)

    With I0 and I1 as input cardinality of the probe and build side
    accordingly. If the plan node does not have a separate build, ProcessingCost
    is the sum of probe cost and build cost. Otherwise, ProcessingCost is
    equal to probeCost.

08. HbaseScanNode, HdfsScanNode, and KuduScanNode:
    Follow the formula from the superclass ScanNode;

09. Nested loop join node:
    When the right child is not a SingularRowSrcNode:

      probe cost = PC(I0 * C(equiJoin predicate), N)  +
                   PC(output cardinality * C(otherJoin predicate), N)
      build cost = PC(I1 * C(equiJoin predicate), N)

    When the right child is a SingularRowSrcNode:

      probe cost = PC(I0, N)
      build cost = PC(I0 * I1, N)

    With I0 and I1 as input cardinality of the probe and build side
    accordingly. If the plan node does not have a separate build, ProcessingCost
    is the sum of probe cost and build cost. Otherwise, ProcessingCost is
    equal to probeCost.

10. ScanNode:
      M = (average row size) / 1024;

11. SelectNode:
    Use the general formula;

12. SingularRowSrcNode:
    Since the node is involved once per input in nested loop join, the
    contribution of this node is computed in nested loop join;

13. SortNode:
    C is the evaluation cost for the sort expression;

14. SubplanNode:
    C is 1. I is the multiplication of the cardinality of the left and
    the right child;

15. Union node:
    C is the cost of result expression evaluation from all non-pass-through
    children;

16. Unnest node:
    I is the cardinality of the containing SubplanNode and C is 1.

17. DataStreamSink:
      M = 1 / num rows per batch.

18. JoinBuildSink:
    ProcessingCost is the build cost of its associated JoinNode.

29. PlanRootSink:
    If result spooling is enabled, C is the cost of output expression
    evaluation. Otherwise. ProcessingCost is zero.

20. TableSink:
    C is the cost of output expression evaluation.
    TableSink subclasses (including HBaseTableSink, HdfsTableSink, and
    KuduTableSink) follows the same formula;

II. Compute the total ProcessingCost of a fragment.

The costing algorithm splits a query fragment into several segments
divided by blocking PlanNode/DataSink boundary. Each fragment segment is
a subtree of PlanNodes/DataSink in the fragment with a DataSink or
blocking PlanNode as root and non-blocking leaves. All other nodes in
the segment are non-blocking. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. A new SegmentCost
class is added to represent this segment.

A fragment that has a blocking PlanNode or blocking DataSink is called a
blocking fragment. A fragment without any blocking nodes is called a
non-blocking fragment. Step IV discuss further about Blocking and
non-blocking fragment.

Take an example of the following fragment plan.

  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12)
  fragment-costs=[34974657, 2159270, 23752870, 22]
  08:TOP-N [LIMIT=100]
  |  cost=900
  |
  07:ANALYTIC
  |  cost=23751970
  |
  06:SORT
  |  cost=2159270
  |
  12:AGGREGATE [FINALIZE]
  |  cost=34548320
  |
  11:EXCHANGE [HASH(i_class)]
     cost=426337

In bottom-up direction, there exist four segments in F03:
1. (11:EXCHANGE, 12:AGGREGATE)
2. 06:SORT
3. (07:ANALYTIC, 08:TOP-N)
4. DataStreamSink of F03

Therefore we have:
PC(segment 1) = 426337+34548320
PC(segment 2) = 2159270
PC(segment 3) = 23751970+900
PC(segment 4) = 22

These per-segment costs stored in SegmentCost tree rooted at
PlanFragment.rootSegment_. In this example, post-order traversal of
rootSegment_ will show their associated cost as:
[34974657, 2159270, 23752870, 22].
F03 is also a blocking fragment since it has 3 blocking PlanNode:
12:AGGREGATE, 06:SORT, and 08:TOP-N.

A rootSegment_ is also called an Output ProcessingCost. A fragment's
Output ProcessingCost is used to adjust the parallelism of the parent
fragment in step III.

This is implemented in PlanFragment.computeSegmentCost() and
PlanFragment.collectSegmentCostHelper().

III. Compute the effective parallelism of query fragments.

The costing algorithm walks PlanFragments of the query plan tree in post
order manner. Upon visiting a PlanFragment, the costing algorithms will
try to adjust the number of instances (effective parallelism) of that
fragment by comparing the Output ProcessingCost of its child and
production-consumption rate between its adjacent segments from step II.
To simplify this initial implementation, the parallelism of PlanFragment
containing EmptySetNode, UnionNode, or ScanNode will remain
unchanged (follow MT_DOP).

This step is implemented at PlanFragment.traverseEffectiveParallelism().

IV. Compute the effective parallelism of the query.

Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar post order walk as step III and split the
query tree into blocking fragment subtrees similar to step II. The
following is an example of a query plan from TPCDS-Q12.

  F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
  PLAN-ROOT SINK
  |
  13:MERGING-EXCHANGE [UNPARTITIONED]
  |
  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12)
  08:TOP-N [LIMIT=100]
  |
  07:ANALYTIC
  |
  06:SORT
  |
  12:AGGREGATE [FINALIZE]
  |
  11:EXCHANGE [HASH(i_class)]
  |
  F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12
  05:AGGREGATE [STREAMING]
  |
  04:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  10:EXCHANGE [BROADCAST]
  |  |
  |  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM]
  |
  03:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  09:EXCHANGE [BROADCAST]
  |  |
  |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  01:SCAN HDFS [tpcds10_parquet.item, RANDOM]
  |
  00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM]

A blocking fragment is a fragment that has a blocking PlanNode or
blocking DataSink in it. The costing algorithm splits the query plan
tree into several blocking subtrees divided by blocking fragment
boundary. Each blocking subtree has a blocking fragment as a root and
non-blocking fragments as leaves. All other fragments in the subtree are
non-blocking. From the TPCDS-Q12 example above, the query plan is
divided into five blocking subtrees of
[(F05, F02), (F06, F01), F00, F03, F04].

A CoreRequirement is a container class that represents the CPU core
requirement of a certain subtree of a query or the query itself. Each
blocking subtree will have its fragment's adjusted instance count summed
into a single CoreRequirement. This means that all fragments within a
blocking subtree can run in parallel and should be assigned one core per
fragment instance. The CoreRequirement for each blocking subtree in the
TPCDS-Q12 example is [4, 4, 12, 3, 1].

Upon visiting a blocking fragment, the maximum between current
CoreRequirement (rooted at that blocking fragment) vs previous blocking
subtrees CoreRequirements is taken and the algorithm continues up to the
next ancestor PlanFragment. The final CoreRequirement for the TPCDS-Q12
example is 12.

This step is implemented at Planner.computeBlockingAwareCores() and
PlanFragment.traverseBlockingAwareCores().

The resulting CoreRequirement at the root PlanFragment is then taken as
the ideal CPU core requirement / effective parallelism of the query.
This number will be compared against the total CPU count of an Impala
executor group to determine if it fits to run in that executor group set
or not. A backend flag query_cpu_requirement_divisor is added to help
scale down/up the CPU core requirement of a query if needed.

Three query options are added to control this CPU costing algorithm.
1. COMPUTE_PROCESSING_COST
   Control whether to enable this CPU costing algorithm or not.
   Must also set MT_DOP > 0 for this query option to take effect.

2. PROCESSING_COST_MAX_THREADS
   Control the maximum number of fragment instances (threads) that the
   costing algorithm is allowed to adjust. The costing algorithm is in
   charge of reducing the fragment's instance count through
   producer-consumer rate comparison. Currently, there is no option to
   set a threshold for the minimum number of instances.

3. PROCESSING_COST_ALLOW_THREAD_INCREMENT
   Control whether the costing algorithm is allowed to increase the
   instance count of a fragment beyond PROCESSING_COST_MAX_THREADS due
   to the large estimated workload. It is suggested to keep this to
   false until the min_processing_per_thread backend flag has been
   finely tuned.

This patch also adds three backend flags to tune the algorithm.
1. query_cpu_requirement_divisor
   Divide the CPU requirement of a query to fit the total available CPU
   in the executor group. For example, setting value 2 will fit the
   query with CPU requirement 2X to an executor group with total
   available CPU X. Note that setting with a fractional value less than
   1 effectively multiplies the query CPU requirement. A valid value is
   > 0.0. The default value is 1.

2. processing_cost_equal_expr_weight
   If true, all expression evaluations are weighted equally to 1 during
   the plan node's processing cost calculation. If false, expression
   cost from IMPALA-2805 will be used. Default to false.

3. min_processing_per_thread
   Minimum processing load that a fragment instance need to work on
   before planner consider increasing instance count. Used to adjust
   fragment instance count based on estimated workload rather than the
   MT_DOP setting. Must be a positive integer. Currently default to 10M.

As an example, the following are additional ProcessingCost information
printed to coordinator log for Q3, Q12, and Q15 ran on TPCDS 10GB scale,
3 executors, MT_DOP=4, and PROCESSING_COST_MAX_THREADS=4.

Q3
CoreRequirement={total=12 trace=F00:12}
ProcessingCost=87361819

Q12
CoreRequirement={total=12 trace=F00:12}
ProcessingCost=62731290

Q15
CoreRequirement={total=15 trace=N07:3+F00:12}
ProcessingCost=69759065

Testing:
- Add TestTpcdsQueryWithProcessingCost, which is a similar run of
  TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4.
  Setting log level TRACE for PlanFragment and manually running
  TestTpcdsQueryWithProcessingCost in minicluster shows several fragment
  instance count reduction from 12 to either of 9, 6, or 3 in
  coordinator log.
- Add PlannerTest#testProcessingCost
  Adjusted fragment count is indicated by "(adjusted from 12)" in the
  query profile.

Co-authored-by: Riza Suminto <riza.sumi...@cloudera.com>

Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
---
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/backend-gflag-util.cc
M common/thrift/BackendGflags.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
M fe/src/main/java/org/apache/impala/analysis/SortInfo.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
A fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
A fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
A fe/src/main/java/org/apache/impala/planner/CoreRequirement.java
M fe/src/main/java/org/apache/impala/planner/DataSink.java
M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
M fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
M fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
M fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
M fe/src/main/java/org/apache/impala/planner/JoinNode.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
A fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
A fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
A fe/src/main/java/org/apache/impala/planner/SegmentCost.java
M fe/src/main/java/org/apache/impala/planner/SelectNode.java
M fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
M fe/src/main/java/org/apache/impala/planner/SortNode.java
M fe/src/main/java/org/apache/impala/planner/SubplanNode.java
A fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/TableSink.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/planner/UnnestNode.java
M fe/src/main/java/org/apache/impala/service/BackendConfig.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/main/java/org/apache/impala/util/ExprUtil.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
A 
testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/query_test/test_tpcds_queries.py
54 files changed, 11,121 insertions(+), 56 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/33/19033/49
--
To view, visit http://gerrit.cloudera.org:8080/19033
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
Gerrit-Change-Number: 19033
Gerrit-PatchSet: 49
Gerrit-Owner: Qifan Chen <qfc...@hotmail.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com>
Gerrit-Reviewer: Qifan Chen <qfc...@hotmail.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

Reply via email to