[ 
https://issues.apache.org/jira/browse/DRILL-1987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14274814#comment-14274814
 ] 

Aman Sinha commented on DRILL-1987:
-----------------------------------

This does not appear to be an issue with the hash join; it seems to be an issue 
with hash partitioning and sorting a very large (66 billion) records.  The 
query does a join of 1M row table with itself on a join key which has only 15 
unique values, so there are 1000000/15 = approx 66666 duplicate keys, so the 
output of the join produces close to 66 billion records.  

The hash join itself completes relatively quickly and the subsequent hash 
partitioning and topN sort have to process 66 B records, which together appears 
to take a very long time.  If I remove the ORDER BY but keep the LIMIT the 
query returns in about 9 seconds (although I saw some errors in the log that I 
need to follow up on).
 
I haven't determined why the MergeJoin plan runs much faster; I looked at the 
query profile and the row count for the output of MergeJoin is much lower which 
seems to suggest a wrong result, so this needs some more investigation. 



> join with tons of duplicates hangs with hash join
> -------------------------------------------------
>
>                 Key: DRILL-1987
>                 URL: https://issues.apache.org/jira/browse/DRILL-1987
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 0.8.0
>            Reporter: Chun Chang
>            Assignee: Aman Sinha
>
> #Fri Jan 09 20:39:31 EST 2015
> git.commit.id.abbrev=487d98e
> With hash join enabled (default), the following join query hangs (running for 
> about 30 min now). The join condition has mostly duplicates. Each table has 1 
> million rows. Data can be downloaded here:
> https://s3.amazonaws.com/apache-drill/files/complex.json.gz
> {code}
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> alter session set 
> `planner.enable_mergejoin` = false;
> +------------+------------+
> |     ok     |  summary   |
> +------------+------------+
> | true       | planner.enable_mergejoin updated. |
> +------------+------------+
> 1 row selected (0.025 seconds)
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> alter session set 
> `planner.enable_hashjoin` = true;
> +------------+------------+
> |     ok     |  summary   |
> +------------+------------+
> | true       | planner.enable_hashjoin updated. |
> +------------+------------+
> 1 row selected (0.045 seconds)
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> select a.id, b.gbyi, a.str 
> from `complex.json` a inner join `complex.json` b on a.gbyi=b.gbyi order by 
> a.id limit 20;
> +------------+------------+------------+
> |     id     |    gbyi    |    str     |
> +------------+------------+------------+
> {code}
> physical plan:
> {code}
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> explain plan for select a.id, 
> b.gbyi, a.str from `complex.json` a inner join `complex.json` b on 
> a.gbyi=b.gbyi order by a.id limit 20;
> +------------+------------+
> |    text    |    json    |
> +------------+------------+
> | 00-00    Screen
> 00-01      Project(id=[$0], gbyi=[$1], str=[$2])
> 00-02        SelectionVectorRemover
> 00-03          Limit(fetch=[20])
> 00-04            SingleMergeExchange(sort0=[0 ASC])
> 01-01              SelectionVectorRemover
> 01-02                TopN(limit=[20])
> 01-03                  HashToRandomExchange(dist0=[[$0]])
> 02-01                    Project(id=[$1], gbyi=[$3], str=[$2])
> 02-02                      HashJoin(condition=[=($0, $3)], joinType=[inner])
> 02-04                        HashToRandomExchange(dist0=[[$0]])
> 03-01                          Project(gbyi=[$0], id=[$2], str=[$1])
> 03-02                            Scan(groupscan=[EasyGroupScan 
> [selectionRoot=/drill/testdata/complex_type/json/complex.json, numFiles=1, 
> columns=[`gbyi`, `id`, `str`], 
> files=[maprfs:/drill/testdata/complex_type/json/complex.json]]])
> 02-03                        Project(gbyi0=[$0])
> 02-05                          HashToRandomExchange(dist0=[[$0]])
> 04-01                            Scan(groupscan=[EasyGroupScan 
> [selectionRoot=/drill/testdata/complex_type/json/complex.json, numFiles=1, 
> columns=[`gbyi`], 
> files=[maprfs:/drill/testdata/complex_type/json/complex.json]]])
> {code}
> If I turn merge join on, the query finishes rather quickly, like within a 
> minute.
> {code}
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> alter session set 
> `planner.enable_hashjoin` = false;
> +------------+------------+
> |     ok     |  summary   |
> +------------+------------+
> | true       | planner.enable_hashjoin updated. |
> +------------+------------+
> 1 row selected (0.026 seconds)
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> alter session set 
> `planner.enable_mergejoin` = true;
> +------------+------------+
> |     ok     |  summary   |
> +------------+------------+
> | true       | planner.enable_mergejoin updated. |
> +------------+------------+
> 1 row selected (0.024 seconds)
> 0: jdbc:drill:schema=dfs.drillTestDirComplexJ> explain plan for select a.id, 
> b.gbyi, a.str from `complex.json` a inner join `complex.json` b on 
> a.gbyi=b.gbyi order by a.id limit 20;
> +------------+------------+
> |    text    |    json    |
> +------------+------------+
> | 00-00    Screen
> 00-01      Project(id=[$0], gbyi=[$1], str=[$2])
> 00-02        SelectionVectorRemover
> 00-03          Limit(fetch=[20])
> 00-04            SingleMergeExchange(sort0=[0 ASC])
> 01-01              SelectionVectorRemover
> 01-02                TopN(limit=[20])
> 01-03                  HashToRandomExchange(dist0=[[$0]])
> 02-01                    Project(id=[$1], gbyi=[$3], str=[$2])
> 02-02                      MergeJoin(condition=[=($0, $3)], joinType=[inner])
> 02-04                        SelectionVectorRemover
> 02-06                          Sort(sort0=[$0], dir0=[ASC])
> 02-08                            HashToRandomExchange(dist0=[[$0]])
> 03-01                              Project(gbyi=[$0], id=[$2], str=[$1])
> 03-02                                Scan(groupscan=[EasyGroupScan 
> [selectionRoot=/drill/testdata/complex_type/json/complex.json, numFiles=1, 
> columns=[`gbyi`, `id`, `str`], 
> files=[maprfs:/drill/testdata/complex_type/json/complex.json]]])
> 02-03                        Project(gbyi0=[$0])
> 02-05                          SelectionVectorRemover
> 02-07                            Sort(sort0=[$0], dir0=[ASC])
> 02-09                              HashToRandomExchange(dist0=[[$0]])
> 04-01                                Scan(groupscan=[EasyGroupScan 
> [selectionRoot=/drill/testdata/complex_type/json/complex.json, numFiles=1, 
> columns=[`gbyi`], 
> files=[maprfs:/drill/testdata/complex_type/json/complex.json]]])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to