Tim Armstrong has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/15096 )

Change subject: WIP: IMPALA-9156: share broadcast join builds
......................................................................


Patch Set 6:

(7 comments)

Still a WIP but fixed some of the issues.

http://gerrit.cloudera.org:8080/#/c/15096/6//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/15096/6//COMMIT_MSG@18
PS6, Line 18: MarkNeedsDeepCopy
> nit: it could be mentioned in IMPALA-4179 that MarkNeedsDeepCopy() was adde
Done


http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-builder.h
File be/src/exec/partitioned-hash-join-builder.h:

http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-builder.h@183
PS6, Line 183: /// At each state transition where the builder state needs to be 
mutated, all probe
             : /// threads must arrive at the barrier before proceeding.
> Should this also occur if one of the probes has 0 rows, isn't there an opti
This is still true. A probe thread still needs to participate in each state 
transition even if it has no real work to do. If there was skew in the probe 
data, then it's possible some probe threads end up blocked (that's kinda just a 
limitation of this lockstep approach). Added a sentence about this.

The builder keeps track of whether there are probe rows in *any* of the probe 
partitions for a build partition, and can then just discard the builder 
partition. That avoids some iterations of the algorithm.


http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-node.cc
File be/src/exec/partitioned-hash-join-node.cc:

http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/exec/partitioned-hash-join-node.cc@578
PS6, Line 578:             out_batch->MarkNeedsDeepCopy();
> It is not self evident at the first glance that this causes out_batch->AtCa
Lol yeah, it was evident to me but probably no-one else in the world. I've 
spent way too much time with these row batch flags.


http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/15096/6/be/src/runtime/query-state.h@243
PS6, Line 243: to
> nit: "to" not needed
Done


http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java:

http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@453
PS6, Line 453:     int leftChildNodes = leftChildFragment.getNumNodes();
             :     if (rhsTree.getCardinality() != -1) {
             :       rhsDataSize = Math.round(
             :           rhsTree.getCardinality() * 
ExchangeNode.getAvgSerializedRowSize(rhsTree));
             :       if (leftChildNodes != -1) {
             :         // RHS data must be broadcast once to each node.
             :         broadcastCost = 2 * rhsDataSize * leftChildNodes;
             :       }
             :     }
> What happens with null aware anti joins? Is the data broadcast "num instanc
You're correct. I added a comment. I think it would be best to live with the 
inaccuracy until I fix the NAAJ instead of churning this code.


http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
File fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java:

http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java@367
PS6, Line 367: ExecutorMembershipSnapshot.getCluster().numExecutors()
> nit: the result will be the same, but using numNodes_ here seems more logic
Done. I also introduced some intermediate variables that hopefully make it more 
readable.


http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
File fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java:

http://gerrit.cloudera.org:8080/#/c/15096/6/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java@1298
PS6, Line 1298:           totalNodes = Math.min(numLocalNodes + numRemoteNodes, 
cluster.numExecutors());
              :           // Exit early if we have maxed out our estimate of 
hosts/instances, to avoid
              :           // extraneous work in case the number of scan ranges 
dominates the number of
              :           // nodes.
              :           if (totalNodes == maxPossibleInstances) break;
> These two lines do not seem to match: if getMaxInstancesPerNode() > 1, then
It turned out that this mostly got the right answer for the wrong reasons - you 
are right that it never broke out here for mt_dop > 1.

My logic was pretty flawed here. I reworked the estimation to be more accurate 
and documented it better.



--
To view, visit http://gerrit.cloudera.org:8080/15096
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I4c67e4b2c87ed0fba648f1e1710addb885d66dc7
Gerrit-Change-Number: 15096
Gerrit-PatchSet: 6
Gerrit-Owner: Tim Armstrong <[email protected]>
Gerrit-Reviewer: Bikramjeet Vig <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Wed, 26 Feb 2020 06:23:30 +0000
Gerrit-HasComments: Yes

Reply via email to