Calcite's Programs.heuristicJoinOrder method with a bushy boolean parameter. If the bushy parameter is true, it will choose MultiJoinOptimizeBushyRule otherwise LoptOptimizeJoinRule. Glad to get message that LoptOptimizeJoinRule could also produce the bushy tree @Jinfeng.
On Wed, May 29, 2019 at 8:37 AM Jinfeng Ni <[email protected]> wrote: > I'm not sure how you got the conclusion that LoptOptmizeJoinRule would not > produce bushy tree join plan. I just tried with tpch Q5 and Q10 on the > sample dataset, and seems that the plans that I got are not left-deep join > tree. ( I could not upload an image to show the visualized plan for those > two queries). > > My impression is that LoptOptimizeJoinRule would produce bushy tree and > left-deep join tree. > > For example, here is the plan for Q5. Operator 00-07 HashJoin has two > inputs from two HashJoins. > > 00-00 Screen : rowType = RecordType(ANY n_name, ANY revenue): > rowcount = 6017.5, cumulative cost = {358148.75 rows, 4407383.86090918 > cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id = 10943 > 00-01 Project(n_name=[$0], revenue=[$1]) : rowType = > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative > cost = {357547.0 rows, 4406782.11090918 cpu, 0.0 io, 0.0 network, > 1250857.6 memory}, id = 10942 > 00-02 SelectionVectorRemover : rowType = RecordType(ANY n_name, > ANY revenue): rowcount = 6017.5, cumulative cost = {357547.0 rows, > 4406782.11090918 cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id = > 10941 > 00-03 Sort(sort0=[$1], dir0=[DESC]) : rowType = > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative > cost = {351529.5 rows, 4400764.61090918 cpu, 0.0 io, 0.0 network, > 1250857.6 memory}, id = 10940 > 00-04 HashAgg(group=[{0}], revenue=[SUM($1)]) : rowType = > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative > cost = {345512.0 rows, 4098567.0 cpu, 0.0 io, 0.0 network, 1154577.6 > memory}, id = 10939 > 00-05 Project(n_name=[$11], $f1=[*($7, -(1, $8))]) : > rowType = RecordType(ANY n_name, ANY $f1): rowcount = 60175.0, > cumulative cost = {285337.0 rows, 2895067.0 cpu, 0.0 io, 0.0 network, > 95497.6 memory}, id = 10938 > 00-06 Project(c_custkey=[$7], c_nationkey=[$8], > o_custkey=[$4], o_orderkey=[$5], o_orderdate=[$6], l_orderkey=[$0], > l_suppkey=[$1], l_extendedprice=[$2], l_discount=[$3], s_suppkey=[$9], > s_nationkey=[$10], n_name=[$11], n_nationkey=[$12], n_regionkey=[$13], > r_regionkey=[$14], r_name=[$15]) : rowType = RecordType(ANY c_custkey, > ANY c_nationkey, ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY > l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount, ANY > s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY > n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0, > cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network, > 95497.6 memory}, id = 10937 > 00-07 HashJoin(condition=[AND(=($1, $9), =($8, > $10))], joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY > l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey, ANY > s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY > n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0, > cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network, > 95497.6 memory}, id = 10936 > 00-09 HashJoin(condition=[=($5, $0)], > joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY > l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount > = 60175.0, cumulative cost = {164600.0 rows, 1206550.0 cpu, 0.0 io, > 0.0 network, 92400.0 memory}, id = 10928 > 00-13 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/lineitem.parquet]], > selectionRoot=classpath:/tpch/lineitem.parquet, numFiles=1, > usedMetadataFile=false, columns=[`l_orderkey`, `l_suppkey`, > `l_extendedprice`, `l_discount`]]]) : rowType = RecordType(ANY > l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount): > rowcount = 60175.0, cumulative cost = {60175.0 rows, 240700.0 cpu, 0.0 > io, 0.0 network, 0.0 memory}, id = 10922 > 00-12 HashJoin(condition=[=($3, $0)], > joinType=[inner]) : rowType = RecordType(ANY o_custkey, ANY > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount > = 3750.0, cumulative cost = {40500.0 rows, 213750.0 cpu, 0.0 io, 0.0 > network, 26400.000000000004 memory}, id = 10927 > 00-17 SelectionVectorRemover : rowType = > RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount = > 3750.0, cumulative cost = {33750.0 rows, 153750.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 10925 > 00-19 Filter(condition=[AND(>=($2, > 1997-01-01), <($2, 1998-01-01 00:00:00))]) : rowType = RecordType(ANY > o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount = 3750.0, > cumulative cost = {30000.0 rows, 150000.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 10924 > 00-21 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/orders.parquet]], > selectionRoot=classpath:/tpch/orders.parquet, numFiles=1, > usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`, > `o_orderdate`]]]) : rowType = RecordType(ANY o_custkey, ANY > o_orderkey, ANY o_orderdate): rowcount = 15000.0, cumulative cost = > {15000.0 rows, 45000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = > 10923 > 00-16 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/customer.parquet]], > selectionRoot=classpath:/tpch/customer.parquet, numFiles=1, > usedMetadataFile=false, columns=[`c_custkey`, `c_nationkey`]]]) : > rowType = RecordType(ANY c_custkey, ANY c_nationkey): rowcount = > 1500.0, cumulative cost = {1500.0 rows, 3000.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 10926 > 00-08 HashJoin(condition=[=($1, $3)], > joinType=[inner]) : rowType = RecordType(ANY s_suppkey, ANY > s_nationkey, ANY n_name, ANY n_nationkey, ANY n_regionkey, ANY > r_regionkey, ANY r_name): rowcount = 100.0, cumulative cost = {287.0 > rows, 2017.0 cpu, 0.0 io, 0.0 network, 457.6000000000001 memory}, id = > 10935 > 00-11 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/supplier.parquet]], > selectionRoot=classpath:/tpch/supplier.parquet, numFiles=1, > usedMetadataFile=false, columns=[`s_suppkey`, `s_nationkey`]]]) : > rowType = RecordType(ANY s_suppkey, ANY s_nationkey): rowcount = > 100.0, cumulative cost = {100.0 rows, 200.0 cpu, 0.0 io, 0.0 network, > 0.0 memory}, id = 10929 > 00-10 HashJoin(condition=[=($2, $3)], > joinType=[inner]) : rowType = RecordType(ANY n_name, ANY n_nationkey, > ANY n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 25.0, > cumulative cost = {62.0 rows, 417.0 cpu, 0.0 io, 0.0 network, 17.6 > memory}, id = 10934 > 00-15 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/nation.parquet]], > selectionRoot=classpath:/tpch/nation.parquet, numFiles=1, > usedMetadataFile=false, columns=[`n_name`, `n_nationkey`, > `n_regionkey`]]]) : rowType = RecordType(ANY n_name, ANY n_nationkey, > ANY n_regionkey): rowcount = 25.0, cumulative cost = {25.0 rows, 75.0 > cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 10930 > 00-14 SelectionVectorRemover : rowType = > RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0, cumulative > cost = {11.0 rows, 34.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = > 10933 > 00-18 Filter(condition=[=($1, 'EUROPE')]) : > rowType = RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0, > cumulative cost = {10.0 rows, 33.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 10932 > 00-20 Scan(groupscan=[ParquetGroupScan > [entries=[ReadEntryWithPath [path=classpath:/tpch/region.parquet]], > selectionRoot=classpath:/tpch/region.parquet, numFiles=1, > usedMetadataFile=false, columns=[`r_regionkey`, `r_name`]]]) : rowType > = RecordType(ANY r_regionkey, ANY r_name): rowcount = 5.0, cumulative > cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = > 10931 > > > On Mon, May 27, 2019 at 8:23 PM weijie tong <[email protected]> > wrote: > > > Thanks for the answer. The blog[1] from hive shows that a optimal bushy > > tree plan could give a better query performance.At the bushy join case, > it > > will make the more build side of hash join nodes works parallel also > with > > reduced intermediate data size. To the worry about plan time cost, most > > bushy join query optimization use the heuristic planner [2] to identify > the > > pattern matches the bushy join to reduce the tree space(That's also what > > calcite does). I wonder whether we can replace the > LoptOptimizeJoinRule > > with MultiJoinOptimizeBushyRule. > > > > [1] > > > > > https://hortonworks.com/blog/hive-0-14-cost-based-optimizer-cbo-technical-overview/ > > [2] http://www.vldb.org/pvldb/vol9/p1401-chen.pdf > > > > On Tue, May 28, 2019 at 5:48 AM Paul Rogers <[email protected]> > > wrote: > > > > > Hi All, > > > > > > Weijie, do you have some example plans that would appear to be > > > sub-optimal, and would be improved with a bushy join plan? What > > > characteristic of the query or schema causes the need for a busy plan? > > > > > > FWIW, Impala uses a compromise approach: it evaluates left-deep plans, > > > then will "flip" a join if the build side turns out to be larger than > the > > > probe side. This may just be an artifact of Impala's cost model which > is > > > designed for star schemas, looks only one step ahead, and struggles > with > > > queries that do not fit the pattern. (Impala especially struggles with > > > multi-key joins and correlated filters on joined tables.) But, since > the > > > classic data warehouse use case tends to have simple star schemas, the > > > Impala approach works pretty well in practice. (Turns out that > Snowflake, > > > in their paper, claims to do something similar. [1]) > > > > > > On the other hand, it might be that Calcite, because it uses a true > cost > > > model, already produces optimal plans and the join-flip trick is > > > unnecessary. > > > > > > A case where this trick seemed to help is the idea of joining two fact > > > tables, each of which is filtered via dimension tables. Making > something > > up: > > > > > > - join on itemid > > > - join on sales.stateid = state.id > > > - state table where state.name = "CA" > > > - sales > > > - join on returns.reasonId = reason.id > > > - reason table where reason.name = "defective" > > > - returns > > > > > > > > > That is, we have large fact tables for sales and returns. We filter > both > > > using a dimension table. Then, we join the (greatly reduced) fact data > > sets > > > on the item ID. A left-deep play will necessarily be less efficient > > because > > > of the need to move an entire fact set though a join. (Though the JPPD > > > feature might reduce the cost by filtering early.) > > > > > > > > > In any event, it would be easy to experiment with this idea in Drill. > > > Drill already has several post-Calcite rule sets. It might be fairly > easy > > > to add one that implements the join-flip case. Running this experiment > > on a > > > test workload would identify if the rule is ever needed, and if it is > > > triggered, if the result improves performance. > > > > > > > > > Thanks, > > > - Paul > > > > > > [1] > http://info.snowflake.net/rs/252-RFO-227/images/Snowflake_SIGMOD.pdf > > > > > > > > > On Monday, May 27, 2019, 2:04:29 PM PDT, Aman Sinha < > > > [email protected]> wrote: > > > > > > Hi Weijie, > > > As you might imagine Busy joins have pros and cons compared to > Left-deep > > > only plans: The main pro is that they enumerate a lot more plan > choices > > > such that the planner is likely to find the optimal join order. On the > > > other hand, there are significant cons: (a) by enumerating more join > > > orders, they would substantially increase planning time (depending on > the > > > number of tables). (b) the size of the intermediate results produced > by > > > the join must be accurately estimated in order to avoid situations > where > > > hash join build side turns out to be orders of magnitude more than > > > estimated. This could happen easily in big data systems where > statistics > > > are constantly changing due to new data ingestion and even running > > ANALYZE > > > continuously is not feasible. > > > That said, it is not a bad idea to experiment with such plans with say > > more > > > than 5 table joins and compare with left-deep plans. > > > > > > Aman > > > > > > On Mon, May 27, 2019 at 7:00 AM weijie tong <[email protected]> > > > wrote: > > > > > > > Hi all: > > > > Does anyone know why we don't support bushy join in the query plan > > > > generation while hep planner is enabled. The codebase shows the fact > > that > > > > the PlannerPhase.JOIN_PLANNING use the LoptOptimizeJoinRule not > > calcite's > > > > MultiJoinOptimizeBushyRule. > > > > > > > > > >
