Riza Suminto has uploaded a new patch set (#56) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 )
Change subject: IMPALA-11604 Planner changes for CPU usage ...................................................................... IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a fragment. The costing algorithm splits a query fragment into several segments divided by blocking PlanNode/DataSink boundary. Each fragment segment is a subtree of PlanNodes/DataSink in the fragment with a DataSink or blocking PlanNode as root and non-blocking leaves. All other nodes in the segment are non-blocking. PlanNodes or DataSink that belong to the same segment will have their ProcessingCost summed. A new CostingSegment class is added to represent this segment. A fragment that has a blocking PlanNode or blocking DataSink is called a blocking fragment. Currently, only JoinBuildSink is considered as blocking DataSink. A fragment without any blocking nodes is called a non-blocking fragment. Step IV discuss further about Blocking and non-blocking fragment. Take an example of the following fragment plant, which is blocking since it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N. F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12) fragment-costs=[34974657, 2159270, 23752870, 22] 08:TOP-N [LIMIT=100] | cost=900 | 07:ANALYTIC | cost=23751970 | 06:SORT | cost=2159270 | 12:AGGREGATE [FINALIZE] | cost=34548320 | 11:EXCHANGE [HASH(i_class)] cost=426337 In bottom-up direction, there exist four segments in F03: Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) Blocking segment 2: 06:SORT Blocking segment 3: (07:ANALYTIC, 08:TOP-N) Non-blocking segment 4: DataStreamSink of F03 Therefore we have: PC(segment 1) = 426337+34548320 PC(segment 2) = 2159270 PC(segment 3) = 23751970+900 PC(segment 4) = 22 These per-segment costs stored in a CostingSegment tree rooted at PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22] respectively after the post-order traversal. This is implemented in PlanFragment.computeCostingSegment() and PlanFragment.collectCostingSegmentHelper(). III. Compute the effective degree of parallelism (EDoP) of fragments. The costing algorithm walks PlanFragments of the query plan tree in post-order traversal. Upon visiting a PlanFragment, the costing algorithms attempts to adjust the number of instances (effective parallelism) of that fragment by comparing the last segment's ProcessingCost of its child and production-consumption rate between its adjacent segments from step II. To simplify this initial implementation, the parallelism of PlanFragment containing EmptySetNode, UnionNode, or ScanNode will remain unchanged (follow MT_DOP). This step is implemented at PlanFragment.traverseEffectiveParallelism(). IV. Compute the EDoP of the query. Effective parallelism of a query is the maximum upper bound of CPU core count that can parallelly work on a query when considering the overlapping between fragment execution and blocking operators. We compute this in a similar post-order traversal as step III and split the query tree into blocking fragment subtrees similar to step II. The following is an example of a query plan from TPCDS-Q12. F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK | 13:MERGING-EXCHANGE [UNPARTITIONED] | F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12) 08:TOP-N [LIMIT=100] | 07:ANALYTIC | 06:SORT | 12:AGGREGATE [FINALIZE] | 11:EXCHANGE [HASH(i_class)] | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 05:AGGREGATE [STREAMING] | 04:HASH JOIN [INNER JOIN, BROADCAST] | |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 10:EXCHANGE [BROADCAST] | | | F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM] | 03:HASH JOIN [INNER JOIN, BROADCAST] | |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 09:EXCHANGE [BROADCAST] | | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 01:SCAN HDFS [tpcds10_parquet.item, RANDOM] | 00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM] A blocking fragment is a fragment that has a blocking PlanNode or blocking DataSink in it. The costing algorithm splits the query plan tree into blocking subtrees divided by blocking fragment boundary. Each blocking subtree has a blocking fragment as a root and non-blocking fragments as the intermediate or leaf nodes. From the TPCDS-Q12 example above, the query plan is divided into five blocking subtrees of [(F05, F02), (F06, F01), F00, F03, F04]. A CoreCount is a container class that represents the CPU core requirement of a subtree of a query or the query itself. Each blocking subtree will have its fragment's adjusted instance count summed into a single CoreCount. This means that all fragments within a blocking subtree can run in parallel and should be assigned one core per fragment instance. The CoreCount for each blocking subtree in the TPCDS-Q12 example is [4, 4, 12, 3, 1]. Upon visiting a blocking fragment, the maximum between current CoreCount (rooted at that blocking fragment) vs previous blocking subtrees CoreCount is taken and the algorithm continues up to the next ancestor PlanFragment. The final CoreCount for the TPCDS-Q12 example is 12. This step is implemented at Planner.computeBlockingAwareCores() and PlanFragment.traverseBlockingAwareCores(). The resulting CoreCount at the root PlanFragment is then taken as the ideal CPU core count / EDoP of the query. This number will be compared against the total CPU count of an Impala executor group to determine if it fits to run in that set or not. A backend flag query_cpu_count_divisor is added to help scale down/up the EDoP of a query if needed. Two query options are added to control the entire computation of EDoP. 1. COMPUTE_PROCESSING_COST Control whether to enable this CPU costing algorithm or not. Must also set MT_DOP > 0 for this query option to take effect. 2. PROCESSING_COST_MIN_THREADS Control the minimum number of fragment instances (threads) that the costing algorithm is allowed to adjust. The costing algorithm is in charge of increasing the fragment's instance count beyond this minimum number through producer-consumer rate comparison. The maximum number of fragment is max between PROCESSING_COST_MIN_THREADS, MT_DOP, and number of cores per executor. This patch also adds three backend flags to tune the algorithm. 1. query_cpu_count_divisor Divide the CPU requirement of a query to fit the total available CPU in the executor group. For example, setting value 2 will fit the query with CPU requirement 2X to an executor group with total available CPU X. Note that setting with a fractional value less than 1 effectively multiplies the query CPU requirement. A valid value is > 0.0. The default value is 1. 2. processing_cost_use_equal_expr_weight If true, all expression evaluations are weighted equally to 1 during the plan node's processing cost calculation. If false, expression cost from IMPALA-2805 will be used. Default to true. 3. min_processing_per_thread Minimum processing load (in processing cost unit) that a fragment instance needs to work on before planner considers increasing instance count based on the processing cost rather than the MT_DOP setting. The decision is per fragment. Setting this to high number will reduce parallelism of a fragment (more workload per fragment), while setting to low number will increase parallelism (less workload per fragment). Actual parallelism might still be constrained by the total number of cores in selected executor group, MT_DOP, or PROCESSING_COST_MIN_THREAD query option. Must be a positive integer. Currently default to 10M. As an example, the following are additional ProcessingCost information printed to coordinator log for Q3, Q12, and Q15 ran on TPCDS 10GB scale, 3 executors, MT_DOP=4, PROCESSING_COST_MAX_THREADS=4, and processing_cost_use_equal_expr_weight=false. Q3 CoreCount={total=12 trace=F00:12} Q12 CoreCount={total=12 trace=F00:12} Q15 CoreCount={total=15 trace=N07:3+F00:12} Testing: - Add TestTpcdsQueryWithProcessingCost, which is a similar run of TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4. Setting log level TRACE for PlanFragment and manually running TestTpcdsQueryWithProcessingCost in minicluster shows several fragment instance count reduction from 12 to either of 9, 6, or 3 in coordinator log. - Add PlannerTest#testProcessingCost Adjusted fragment count is indicated by "(adjusted from 12)" in the query profile. - Add TestExecutorGroups::test_query_cpu_count_divisor. Co-authored-by: Riza Suminto <riza.sumi...@cloudera.com> Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a --- M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/backend-gflag-util.cc M common/thrift/BackendGflags.thrift M common/thrift/ImpalaService.thrift M common/thrift/Planner.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java M fe/src/main/java/org/apache/impala/analysis/SortInfo.java M fe/src/main/java/org/apache/impala/planner/AggregationNode.java M fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java A fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java A fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java M fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java A fe/src/main/java/org/apache/impala/planner/CoreCount.java A fe/src/main/java/org/apache/impala/planner/CostingSegment.java M fe/src/main/java/org/apache/impala/planner/DataSink.java M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java M fe/src/main/java/org/apache/impala/planner/DataStreamSink.java M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java M fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java M fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java M fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java M fe/src/main/java/org/apache/impala/planner/JoinNode.java M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java M fe/src/main/java/org/apache/impala/planner/KuduTableSink.java M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/PlanNode.java M fe/src/main/java/org/apache/impala/planner/PlanRootSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java A fe/src/main/java/org/apache/impala/planner/ProcessingCost.java A fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java M fe/src/main/java/org/apache/impala/planner/ScanNode.java M fe/src/main/java/org/apache/impala/planner/SelectNode.java M fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java M fe/src/main/java/org/apache/impala/planner/SortNode.java M fe/src/main/java/org/apache/impala/planner/SubplanNode.java A fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java M fe/src/main/java/org/apache/impala/planner/TableSink.java M fe/src/main/java/org/apache/impala/planner/UnionNode.java M fe/src/main/java/org/apache/impala/planner/UnnestNode.java M fe/src/main/java/org/apache/impala/service/BackendConfig.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/util/ExprUtil.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java R fe/src/test/resources/fair-scheduler-3-groups.xml R fe/src/test/resources/llama-site-3-groups.xml A testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test M tests/custom_cluster/test_executor_groups.py M tests/query_test/test_tpcds_queries.py 57 files changed, 11,062 insertions(+), 62 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/33/19033/56 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 56 Gerrit-Owner: Qifan Chen <qfc...@hotmail.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com> Gerrit-Reviewer: Qifan Chen <qfc...@hotmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>