Hi Volodymyr,
You are right, fancy join planning only makes sense if we have useful row and 
key cardinality information.
I seem to recall that Drill estimated row counts based on file size. Clearly, a 
10 MB file size has far fewer rows than a 1 GB file. Do we no longer do that 
(or is my memory faulty?)
Also, with the recent statistics work, we should be able to get proper 
information for row count, even for CSV or JSON, and other files without 
metadata headers or footers.
Also, with stats, we should have NDV for columns. A fault with the HMS solution 
is that the NDV is computed independently for each column. If a join is on a 
compound key, e.g. (state, city), one cannot, in general, combine per-key NDV 
to get an overall key NDV. If we're doing our own stats, then it would be great 
if users could identify compound keys and have the stas mechanism gather NDV 
for that key: (state, city) in our example. (Maybe we do that?)

One thing that neither Hive nor Impala do, but that Oracle has long done, is 
extrapolate stats. If we gathered stats from the first 1000 files, but the 
table now contains 2000 files, we should be able to simply scale up the 
1000-row stats to approximate new row counts and NDVs. Because Impala does not 
do this, users are forced to constantly update stats, which puts unnecessary 
heavy load on the system. The extrapolated stats won't be right, but they'll be 
less wrong than using the out-of-date values. Did we perhaps add this in our 
recent stats work?

Thanks,
- Paul

 

    On Wednesday, May 29, 2019, 2:50:39 PM PDT, Vova Vysotskyi 
<vvo...@gmail.com> wrote:  
 
 Hi all,

Regarding the Impala's approach described by Paul, we have a similar thing
as Impala does: after applying LoptOptimizeJoinRule and other
optimizations, SwapHashJoinVisitor is applied to the rel nodes tree to swap
hash join inputs for some cases. It was implemented in the scope
of DRILL-2236.

Regarding the Hive's use case, Hive has more info about data, starting from
the row count and selectivity info, so it can more precisely detect
resulting row count after joining, but Drill for some data sources (like
CSV or JSON) even don't have info about row count, so it may be riskier to
do join reordering and in particular make build side for result of the join
more often.
Currently, there are a lot of the issues caused by non-sufficient info
about the data, for example, DRILL-1162.

But anyway, I agree that we should experiment with this optimization, and
make a decision.

Kind regards,
Volodymyr Vysotskyi


On Wed, May 29, 2019 at 4:57 PM weijie tong <tongweijie...@gmail.com> wrote:

> Calcite's Programs.heuristicJoinOrder method with a bushy boolean
> parameter. If the bushy parameter is true, it will choose
> MultiJoinOptimizeBushyRule otherwise LoptOptimizeJoinRule. Glad to get
> message that LoptOptimizeJoinRule could also produce the bushy tree
> @Jinfeng.
>
> On Wed, May 29, 2019 at 8:37 AM Jinfeng Ni <j...@apache.org> wrote:
>
> > I'm not sure how you got the conclusion that LoptOptmizeJoinRule would
> not
> > produce bushy tree join plan.  I just tried with tpch Q5 and Q10 on the
> > sample dataset, and seems that the plans that I got are not left-deep
> join
> > tree. ( I could not upload an image to show the visualized plan for those
> > two queries).
> >
> > My impression is that LoptOptimizeJoinRule would produce bushy tree and
> > left-deep join tree.
> >
> > For example, here is the plan for Q5. Operator 00-07 HashJoin has two
> > inputs from two HashJoins.
> >
> > 00-00    Screen : rowType = RecordType(ANY n_name, ANY revenue):
> > rowcount = 6017.5, cumulative cost = {358148.75 rows, 4407383.86090918
> > cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id = 10943
> > 00-01      Project(n_name=[$0], revenue=[$1]) : rowType =
> > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
> > cost = {357547.0 rows, 4406782.11090918 cpu, 0.0 io, 0.0 network,
> > 1250857.6 memory}, id = 10942
> > 00-02        SelectionVectorRemover : rowType = RecordType(ANY n_name,
> > ANY revenue): rowcount = 6017.5, cumulative cost = {357547.0 rows,
> > 4406782.11090918 cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id =
> > 10941
> > 00-03          Sort(sort0=[$1], dir0=[DESC]) : rowType =
> > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
> > cost = {351529.5 rows, 4400764.61090918 cpu, 0.0 io, 0.0 network,
> > 1250857.6 memory}, id = 10940
> > 00-04            HashAgg(group=[{0}], revenue=[SUM($1)]) : rowType =
> > RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
> > cost = {345512.0 rows, 4098567.0 cpu, 0.0 io, 0.0 network, 1154577.6
> > memory}, id = 10939
> > 00-05              Project(n_name=[$11], $f1=[*($7, -(1, $8))]) :
> > rowType = RecordType(ANY n_name, ANY $f1): rowcount = 60175.0,
> > cumulative cost = {285337.0 rows, 2895067.0 cpu, 0.0 io, 0.0 network,
> > 95497.6 memory}, id = 10938
> > 00-06                Project(c_custkey=[$7], c_nationkey=[$8],
> > o_custkey=[$4], o_orderkey=[$5], o_orderdate=[$6], l_orderkey=[$0],
> > l_suppkey=[$1], l_extendedprice=[$2], l_discount=[$3], s_suppkey=[$9],
> > s_nationkey=[$10], n_name=[$11], n_nationkey=[$12], n_regionkey=[$13],
> > r_regionkey=[$14], r_name=[$15]) : rowType = RecordType(ANY c_custkey,
> > ANY c_nationkey, ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY
> > l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount, ANY
> > s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY
> > n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0,
> > cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network,
> > 95497.6 memory}, id = 10937
> > 00-07                  HashJoin(condition=[AND(=($1, $9), =($8,
> > $10))], joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY
> > l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY
> > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey, ANY
> > s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY
> > n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0,
> > cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network,
> > 95497.6 memory}, id = 10936
> > 00-09                    HashJoin(condition=[=($5, $0)],
> > joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY
> > l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY
> > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount
> > = 60175.0, cumulative cost = {164600.0 rows, 1206550.0 cpu, 0.0 io,
> > 0.0 network, 92400.0 memory}, id = 10928
> > 00-13                      Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/lineitem.parquet]],
> > selectionRoot=classpath:/tpch/lineitem.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`l_orderkey`, `l_suppkey`,
> > `l_extendedprice`, `l_discount`]]]) : rowType = RecordType(ANY
> > l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount):
> > rowcount = 60175.0, cumulative cost = {60175.0 rows, 240700.0 cpu, 0.0
> > io, 0.0 network, 0.0 memory}, id = 10922
> > 00-12                      HashJoin(condition=[=($3, $0)],
> > joinType=[inner]) : rowType = RecordType(ANY o_custkey, ANY
> > o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount
> > = 3750.0, cumulative cost = {40500.0 rows, 213750.0 cpu, 0.0 io, 0.0
> > network, 26400.000000000004 memory}, id = 10927
> > 00-17                        SelectionVectorRemover : rowType =
> > RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount =
> > 3750.0, cumulative cost = {33750.0 rows, 153750.0 cpu, 0.0 io, 0.0
> > network, 0.0 memory}, id = 10925
> > 00-19                          Filter(condition=[AND(>=($2,
> > 1997-01-01), <($2, 1998-01-01 00:00:00))]) : rowType = RecordType(ANY
> > o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount = 3750.0,
> > cumulative cost = {30000.0 rows, 150000.0 cpu, 0.0 io, 0.0 network,
> > 0.0 memory}, id = 10924
> > 00-21                            Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/orders.parquet]],
> > selectionRoot=classpath:/tpch/orders.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`,
> > `o_orderdate`]]]) : rowType = RecordType(ANY o_custkey, ANY
> > o_orderkey, ANY o_orderdate): rowcount = 15000.0, cumulative cost =
> > {15000.0 rows, 45000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
> > 10923
> > 00-16                        Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/customer.parquet]],
> > selectionRoot=classpath:/tpch/customer.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`c_custkey`, `c_nationkey`]]]) :
> > rowType = RecordType(ANY c_custkey, ANY c_nationkey): rowcount =
> > 1500.0, cumulative cost = {1500.0 rows, 3000.0 cpu, 0.0 io, 0.0
> > network, 0.0 memory}, id = 10926
> > 00-08                    HashJoin(condition=[=($1, $3)],
> > joinType=[inner]) : rowType = RecordType(ANY s_suppkey, ANY
> > s_nationkey, ANY n_name, ANY n_nationkey, ANY n_regionkey, ANY
> > r_regionkey, ANY r_name): rowcount = 100.0, cumulative cost = {287.0
> > rows, 2017.0 cpu, 0.0 io, 0.0 network, 457.6000000000001 memory}, id =
> > 10935
> > 00-11                      Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/supplier.parquet]],
> > selectionRoot=classpath:/tpch/supplier.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`s_suppkey`, `s_nationkey`]]]) :
> > rowType = RecordType(ANY s_suppkey, ANY s_nationkey): rowcount =
> > 100.0, cumulative cost = {100.0 rows, 200.0 cpu, 0.0 io, 0.0 network,
> > 0.0 memory}, id = 10929
> > 00-10                      HashJoin(condition=[=($2, $3)],
> > joinType=[inner]) : rowType = RecordType(ANY n_name, ANY n_nationkey,
> > ANY n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 25.0,
> > cumulative cost = {62.0 rows, 417.0 cpu, 0.0 io, 0.0 network, 17.6
> > memory}, id = 10934
> > 00-15                        Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/nation.parquet]],
> > selectionRoot=classpath:/tpch/nation.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`n_name`, `n_nationkey`,
> > `n_regionkey`]]]) : rowType = RecordType(ANY n_name, ANY n_nationkey,
> > ANY n_regionkey): rowcount = 25.0, cumulative cost = {25.0 rows, 75.0
> > cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 10930
> > 00-14                        SelectionVectorRemover : rowType =
> > RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0, cumulative
> > cost = {11.0 rows, 34.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
> > 10933
> > 00-18                          Filter(condition=[=($1, 'EUROPE')]) :
> > rowType = RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0,
> > cumulative cost = {10.0 rows, 33.0 cpu, 0.0 io, 0.0 network, 0.0
> > memory}, id = 10932
> > 00-20                            Scan(groupscan=[ParquetGroupScan
> > [entries=[ReadEntryWithPath [path=classpath:/tpch/region.parquet]],
> > selectionRoot=classpath:/tpch/region.parquet, numFiles=1,
> > usedMetadataFile=false, columns=[`r_regionkey`, `r_name`]]]) : rowType
> > = RecordType(ANY r_regionkey, ANY r_name): rowcount = 5.0, cumulative
> > cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
> > 10931
> >
> >
> > On Mon, May 27, 2019 at 8:23 PM weijie tong <tongweijie...@gmail.com>
> > wrote:
> >
> > > Thanks for the answer. The blog[1] from hive shows that a optimal bushy
> > > tree plan could give a better query performance.At the bushy join case,
> > it
> > > will make the more build side of hash join nodes works parallel  also
> > with
> > > reduced intermediate data size.  To the worry about plan time cost,
> most
> > > bushy join query optimization use the heuristic planner [2] to identify
> > the
> > > pattern matches the bushy join to reduce the tree space(That's also
> what
> > > calcite does).  I wonder whether we can replace the
> >  LoptOptimizeJoinRule
> > > with MultiJoinOptimizeBushyRule.
> > >
> > > [1]
> > >
> > >
> >
> https://hortonworks.com/blog/hive-0-14-cost-based-optimizer-cbo-technical-overview/
> > > [2] http://www.vldb.org/pvldb/vol9/p1401-chen.pdf
> > >
> > > On Tue, May 28, 2019 at 5:48 AM Paul Rogers <par0...@yahoo.com.invalid
> >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Weijie, do you have some example plans that would appear to be
> > > > sub-optimal, and would be improved with a bushy join plan? What
> > > > characteristic of the query or schema causes the need for a busy
> plan?
> > > >
> > > > FWIW, Impala uses a compromise approach: it evaluates left-deep
> plans,
> > > > then will "flip" a join if the build side turns out to be larger than
> > the
> > > > probe side. This may just be an artifact of Impala's cost model which
> > is
> > > > designed for star schemas, looks only one step ahead, and struggles
> > with
> > > > queries that do not fit the pattern. (Impala especially struggles
> with
> > > > multi-key joins and correlated filters on joined tables.) But, since
> > the
> > > > classic data warehouse use case tends to have simple star schemas,
> the
> > > > Impala approach works pretty well in practice. (Turns out that
> > Snowflake,
> > > > in their paper, claims to do something similar. [1])
> > > >
> > > > On the other hand, it might be that Calcite, because it uses a true
> > cost
> > > > model, already produces optimal plans and the join-flip trick is
> > > > unnecessary.
> > > >
> > > > A case where this trick seemed to help is the idea of joining two
> fact
> > > > tables, each of which is filtered via dimension tables. Making
> > something
> > > up:
> > > >
> > > > - join on itemid
> > > >  - join on sales.stateid = state.id
> > > >    - state table where state.name = "CA"
> > > >    - sales
> > > >  - join on returns.reasonId = reason.id
> > > >    - reason table where reason.name = "defective"
> > > >    - returns
> > > >
> > > >
> > > > That is, we have large fact tables for sales and returns. We filter
> > both
> > > > using a dimension table. Then, we join the (greatly reduced) fact
> data
> > > sets
> > > > on the item ID. A left-deep play will necessarily be less efficient
> > > because
> > > > of the need to move an entire fact set though a join. (Though the
> JPPD
> > > > feature might reduce the cost by filtering early.)
> > > >
> > > >
> > > > In any event, it would be easy to experiment with this idea in Drill.
> > > > Drill already has several post-Calcite rule sets. It might be fairly
> > easy
> > > > to add one that implements the join-flip case. Running this
> experiment
> > > on a
> > > > test workload would identify if the rule is ever needed, and if it is
> > > > triggered, if the result improves performance.
> > > >
> > > >
> > > > Thanks,
> > > > - Paul
> > > >
> > > > [1]
> > http://info.snowflake.net/rs/252-RFO-227/images/Snowflake_SIGMOD.pdf
> > > >
> > > >
> > > >    On Monday, May 27, 2019, 2:04:29 PM PDT, Aman Sinha <
> > > > amansi...@gmail.com> wrote:
> > > >
> > > >  Hi Weijie,
> > > > As you might imagine Busy joins have pros and cons compared to
> > Left-deep
> > > > only plans:  The main pro is that they enumerate a lot more plan
> > choices
> > > > such that the planner is likely to find the optimal join order.  On
> the
> > > > other hand, there are significant cons: (a) by enumerating more join
> > > > orders, they would substantially increase planning time (depending on
> > the
> > > > number of tables).  (b) the size of the intermediate results produced
> > by
> > > > the join must be accurately estimated in order to avoid situations
> > where
> > > > hash join build side turns out to be orders of magnitude more than
> > > > estimated.  This could happen easily in big data systems where
> > statistics
> > > > are constantly changing due to new data ingestion and even running
> > > ANALYZE
> > > > continuously is not feasible.
> > > > That said, it is not a bad idea to experiment with such plans with
> say
> > > more
> > > > than 5 table joins and compare with left-deep plans.
> > > >
> > > > Aman
> > > >
> > > > On Mon, May 27, 2019 at 7:00 AM weijie tong <tongweijie...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi all:
> > > > >  Does anyone know why we don't support bushy join in the query plan
> > > > > generation while hep planner is enabled. The codebase shows the
> fact
> > > that
> > > > > the PlannerPhase.JOIN_PLANNING use the LoptOptimizeJoinRule not
> > > calcite's
> > > > > MultiJoinOptimizeBushyRule.
> > > > >
> > > >
> > >
> >
>
  

Reply via email to