Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/15096 )
Change subject: WIP: IMPALA-9156: share broadcast join builds ...................................................................... Patch Set 6: (7 comments) Still a WIP but fixed some of the issues. http://gerrit.cloudera.org:8080/#/c/15096/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/15096/6//COMMIT_MSG@18 PS6, Line 18: MarkNeedsDeepCopy > nit: it could be mentioned in IMPALA-4179 that MarkNeedsDeepCopy() was adde Done http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-builder.h File be/src/exec/partitioned-hash-join-builder.h: http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-builder.h@183 PS6, Line 183: /// At each state transition where the builder state needs to be mutated, all probe : /// threads must arrive at the barrier before proceeding. > Should this also occur if one of the probes has 0 rows, isn't there an opti This is still true. A probe thread still needs to participate in each state transition even if it has no real work to do. If there was skew in the probe data, then it's possible some probe threads end up blocked (that's kinda just a limitation of this lockstep approach). Added a sentence about this. The builder keeps track of whether there are probe rows in *any* of the probe partitions for a build partition, and can then just discard the builder partition. That avoids some iterations of the algorithm. http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-node.cc File be/src/exec/partitioned-hash-join-node.cc: http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-node.cc@578 PS6, Line 578: out_batch->MarkNeedsDeepCopy(); > It is not self evident at the first glance that this causes out_batch->AtCa Lol yeah, it was evident to me but probably no-one else in the world. I've spent way too much time with these row batch flags. http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/runtime/query-state.h@243 PS6, Line 243: to > nit: "to" not needed Done http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java: http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@453 PS6, Line 453: int leftChildNodes = leftChildFragment.getNumNodes(); : if (rhsTree.getCardinality() != -1) { : rhsDataSize = Math.round( : rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); : if (leftChildNodes != -1) { : // RHS data must be broadcast once to each node. : broadcastCost = 2 * rhsDataSize * leftChildNodes; : } : } > What happens with null aware anti joins? Is the data broadcast "num instanc You're correct. I added a comment. I think it would be best to live with the inaccuracy until I fix the NAAJ instead of churning this code. http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java File fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java: http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java@367 PS6, Line 367: ExecutorMembershipSnapshot.getCluster().numExecutors() > nit: the result will be the same, but using numNodes_ here seems more logic Done. I also introduced some intermediate variables that hopefully make it more readable. http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java File fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java: http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java@1298 PS6, Line 1298: totalNodes = Math.min(numLocalNodes + numRemoteNodes, cluster.numExecutors()); : // Exit early if we have maxed out our estimate of hosts/instances, to avoid : // extraneous work in case the number of scan ranges dominates the number of : // nodes. : if (totalNodes == maxPossibleInstances) break; > These two lines do not seem to match: if getMaxInstancesPerNode() > 1, then It turned out that this mostly got the right answer for the wrong reasons - you are right that it never broke out here for mt_dop > 1. My logic was pretty flawed here. I reworked the estimation to be more accurate and documented it better. -- To view, visit http://gerrit.cloudera.org:8080/15096 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I4c67e4b2c87ed0fba648f1e1710addb885d66dc7 Gerrit-Change-Number: 15096 Gerrit-PatchSet: 6 Gerrit-Owner: Tim Armstrong <[email protected]> Gerrit-Reviewer: Bikramjeet Vig <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Wed, 26 Feb 2020 06:23:30 +0000 Gerrit-HasComments: Yes
