Riza Suminto has uploaded a new patch set (#35) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 )
Change subject: IMPALA-11604 Planner changes for CPU usage ...................................................................... IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions, partition-by-equal, and order-by-equal predicate; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 21. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follow the same formula; II. Compute the total ProcessingCost of a fragment. A query fragment can have one or more ProcessingCost associated with it. A fragment may have more than 1 ProcessingCost if it contains blocking PlanNode within it. The costing algorithm split the fragment into several blocking segments with a boundary on blocking PlanNode. PlanNodes or DataSink that belong to the same blocking segment will have their ProcessingCost summed. If a fragment does not have a blocking PlanNode, then its ProcessingCost is a sum of all ProcessingCost of its PlanNodes and DataSink. This is implemented in PlanFragment.computeFragmentProcessingCost() and PlanFragment.collectProcessingCostHelper(). III. Compute the total ProcessingCost of the query. The costing algorithm recursively does summation and comparisons of PlanFragment's ProcessingCosts start from the leaf fragment of the query plan tree and go up to the root fragment. Upon visiting blocking PlanNode, the current ProcessingCost sum will be compared against the ProcessingCost of the blocking-child-subtrees. The maximum is taken and the recursive algorithm continues to the next ancestor PlanFragment. During this recursive walk, the costing algorithm will also try to adjust the number of instances (effective parallelism) of a fragment by comparing production-consumption rate and cost between adjacent PlanNodes. The adjusted number of instance instances will contribute towards CoreRequirement in step IV, which in turn deciding which executor group to run. Adjusting instance count of a fragment's ProcessingCost should not change its total cost, except for ProcessingCost associated with broadcast exchange node. A possible strategy to deal with cost change in broadcast exchange is by doing recomputation in PlanFragment.traverseBlockingAwareCost() in the event of instance count change. This patch, however, has not do that yet with assumption that cost change of broadcast exchange node is negligible. This is implemented at Planner.computeBlockingAwareCost() and PlanFragment.traverseBlockingAwareCost(). IV. Compute the efficient parallelism of the query. Efficient parallelism of a query can be described as an ideal CPU core count required to run a query efficiently when considering the overlapping between fragment execution and blocking operators. This is computed in a similar recursive walk as step III. Adjacent fragments that are not blocking will have their adjusted instance count summed, meaning that both fragments can run in parallel and should be assigned 1 core per fragment instance. The summation stops upon visiting a blocking PlanFragment. At blocking PlanFragment, the maximum between current CoreRequirement vs blocking-child-subtrees CoreRequirements is taken and the recursive algorithm continues to the next ancestor PlanFragment. The resulting CoreRequirement at the root PlanFragment then taken as the ideal CPU core requirement / effective parallelism of the query. In the future work, this number will be compared against the total CPU count of an Impala executor group to determine if it fits to run in that executor group or not. This is implemented at Planner.computeBlockingAwareCores() and PlanFragment.traverseBlockingAwareCores(). Three query options are added to control this CPU costing algorithm. 1. COMPUTE_PROCESSING_COST Control whether to enable this CPU costing algorithm or not. 2. PROCESSING_COST_ALLOW_INSTANCE_INCREMENT Control whether to allow costing algorithm to increase instance count of a fragment beyond the count from original plan. 3. PROCESSING_COST_MAX_INSTANCES Control the maximum number of fragment instances that the costing algorithm is allowed to adjust to. This is an experimental query option that allows fragment instance count to exceed the MT_DOP option. Setting 0 means MT_DOP will be used to count the maximum instance count instead. As an example, the following are additional ProcessingCost information displayed in query plan for Q3, Q12, and Q15 ran on TPCDS 10GB scale, 3 executors, and MT_DOP=4. Q3: Efficient parallelism: 15 Efficient parallelism trace: N10:3+F00:12 Q12: Efficient parallelism: 21 Efficient parallelism trace: N12:9+F00:12 Q15: Efficient parallelism: 21 Efficient parallelism trace: N14:3+F04:6+F00:12 Testing: - Add TestTpcdsQueryWithProcessingCost, which is a similar run of TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4. Setting log level TRACE for PlanFragment and manually running TestTpcdsQueryWithProcessingCost in minicluster shows several fragment instance count reduction from 12 to either of 9, 6, or 3 in coordinator log. Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a --- M be/src/scheduling/scheduler.cc M be/src/service/query-options.cc M be/src/service/query-options.h M common/thrift/Frontend.thrift M common/thrift/ImpalaService.thrift M common/thrift/Planner.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java M fe/src/main/java/org/apache/impala/analysis/Expr.java M fe/src/main/java/org/apache/impala/analysis/SortInfo.java M fe/src/main/java/org/apache/impala/planner/AggregationNode.java M fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java A fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java A fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java M fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java A fe/src/main/java/org/apache/impala/planner/CoreRequirement.java M fe/src/main/java/org/apache/impala/planner/DataSink.java M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java M fe/src/main/java/org/apache/impala/planner/DataStreamSink.java M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java M fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java M fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java M fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java M fe/src/main/java/org/apache/impala/planner/JoinNode.java M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java M fe/src/main/java/org/apache/impala/planner/KuduTableSink.java A fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/PlanNode.java M fe/src/main/java/org/apache/impala/planner/PlanRootSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java A fe/src/main/java/org/apache/impala/planner/ProcessingCost.java M fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java M fe/src/main/java/org/apache/impala/planner/ScanNode.java M fe/src/main/java/org/apache/impala/planner/SelectNode.java M fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java M fe/src/main/java/org/apache/impala/planner/SortNode.java M fe/src/main/java/org/apache/impala/planner/SubplanNode.java A fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java M fe/src/main/java/org/apache/impala/planner/TableSink.java M fe/src/main/java/org/apache/impala/planner/UnionNode.java M fe/src/main/java/org/apache/impala/planner/UnnestNode.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/util/ExprUtil.java M tests/query_test/test_tpcds_queries.py 50 files changed, 1,833 insertions(+), 43 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/33/19033/35 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 35 Gerrit-Owner: Qifan Chen <qfc...@hotmail.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com> Gerrit-Reviewer: Qifan Chen <qfc...@hotmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>