Hello Zoltan Borok-Nagy, Csaba Ringhofer, Bikramjeet Vig, Impala Public Jenkins,
I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/14859 to look at the new patch set (#38). Change subject: IMPALA-4224: execute separate join builds fragments ...................................................................... IMPALA-4224: execute separate join builds fragments This enables parallel plans with the join build in a separate fragment and fixes all of the ensuing fallout. After this change, mt_dop plans with joins have separate build fragments. There is still a 1:1 relationship between join nodes and builders, so the builders are only accessed by the join node's thread after it is handed off. This lets us defer the work required to make PhjBuilder and NljBuilder safe to be shared between nodes. Planner changes: * Combined the parallel and distributed planning code paths. * Misc fixes to generate reasonable thrift structures in the query exec requests, i.e. containing the right nodes. * Fixes to resource calculations for the separate build plans. ** Calculate separate join/build resource consumption. ** Simplified the resource estimation by calculating resource consumption for each fragment separately, and assuming that all fragments hit their peak resource consumption at the same time. IMPALA-9255 is the follow-on to make the resource estimation more accurate. Scheduler changes: * Various fixes to handle multiple TPlanExecInfos correctly, which are generated by the planner for the different cohorts. * Add logic to colocate build fragments with parent fragments. Runtime filter changes: * Build sinks now produce runtime filters, which required planner and coordinator fixes to handle. DataSink changes: * Close the input plan tree before calling FlushFinal() to release resources. This depends on Send() not holding onto references to input batches, which was true except for NljBuilder. This invariant is documented. Join builder changes: * Add a common base class for PhjBuilder and NljBuilder with functions to handle synchronisation with the join node. * Close plan tree earlier in FragmentInstanceState::Exec() so that peak resource requirements are lower. * The NLJ always copies input batches, so that it can close its input tree. JoinNode changes: * Join node blocks waiting for build-side to be ready, then eventually signals that it's done, allowing the builder to be cleaned up. * NLJ and PHJ nodes handle both the integrated builder and the external builder. There is a 1:1 relationship between the node and the builder, so we don't deal with thread safety yet. * Buffer reservations are transferred between the builder and join node when running with the separate builder. This is not really necessary right now, since it is all single-threaded, but will be important for the shared broadcast. - The builder transfers memory for probe buffers to the join node at the end of each build phase. - At end of each probe phase, reservation needs to be handed back to builder (or released). ExecSummary changes: * The summary logic was modified to handle connecting fragments via join builds. The logic is an extension of what was used for exchanges. Testing: * Enable --unlock_mt_dop for end-to-end tests * Migrate some tests to run as part of end-to-end tests instead of custom cluster. * Add mt_dop dimension to various end-to-end tests to provide coverage of join queries, spill-to-disk and cancellation. * Ran a single node TPC-H and TPC-DS stress test with mt_dop=0 and mt_dop=4. Perf: * Ran TPC-H scale factor 30 locally with mt_dop=0. No significant change. Change-Id: I4403c8e62d9c13854e7830602ee613f8efc80c58 --- M be/src/exec/CMakeLists.txt M be/src/exec/blocking-join-node.cc M be/src/exec/blocking-join-node.h M be/src/exec/data-sink.cc M be/src/exec/data-sink.h M be/src/exec/exec-node.h A be/src/exec/join-builder.cc A be/src/exec/join-builder.h M be/src/exec/nested-loop-join-builder.cc M be/src/exec/nested-loop-join-builder.h M be/src/exec/nested-loop-join-node.cc M be/src/exec/nested-loop-join-node.h M be/src/exec/partitioned-hash-join-builder.cc M be/src/exec/partitioned-hash-join-builder.h M be/src/exec/partitioned-hash-join-node.cc M be/src/exec/partitioned-hash-join-node.h M be/src/runtime/bufferpool/buffer-pool-internal.h M be/src/runtime/bufferpool/buffer-pool-test.cc M be/src/runtime/bufferpool/buffer-pool.cc M be/src/runtime/bufferpool/buffer-pool.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/initial-reservations.cc M be/src/runtime/row-batch.cc M be/src/runtime/runtime-state.cc M be/src/runtime/runtime-state.h M be/src/runtime/spillable-row-batch-queue.h M be/src/util/summary-util.cc M bin/run-all-tests.sh M common/thrift/DataSinks.thrift M common/thrift/ExecStats.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/java/org/apache/impala/planner/DataSink.java M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java M fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java M fe/src/main/java/org/apache/impala/planner/JoinNode.java M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java M fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/PlanNode.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/ResourceProfile.java M fe/src/main/java/org/apache/impala/service/Frontend.java M shell/impala_client.py M testdata/workloads/functional-planner/queries/PlannerTest/mem-limit-broadcast-join.test M testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test M testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test M testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test M testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test M testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test M testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test M testdata/workloads/functional-query/queries/QueryTest/spilling.test M tests/common/impala_test_suite.py M tests/custom_cluster/test_mt_dop.py M tests/failure/test_failpoints.py M tests/query_test/test_cancellation.py M tests/query_test/test_join_queries.py M tests/query_test/test_mt_dop.py M tests/query_test/test_nested_types.py M tests/query_test/test_runtime_filters.py M tests/query_test/test_spilling.py 65 files changed, 2,086 insertions(+), 983 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/59/14859/38 -- To view, visit http://gerrit.cloudera.org:8080/14859 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I4403c8e62d9c13854e7830602ee613f8efc80c58 Gerrit-Change-Number: 14859 Gerrit-PatchSet: 38 Gerrit-Owner: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-Reviewer: Bikramjeet Vig <bikramjeet....@cloudera.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>