[ 
https://issues.apache.org/jira/browse/HIVE-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13714760#comment-13714760
 ] 

Yin Huai commented on HIVE-4827:
--------------------------------

Hi Edward, let me explain with an example.

Let's say we have a query like the one shown in the description...
{code}
SELECT tmp1.key
FROM (SELECT x1.key2 AS key
      FROM bigTable1 x1 JOIN smallTable1 y1 ON (x1.key1 = y1.key1)
      UNION ALL
      SELECT x2.key2 AS key
      FROM bigTable2 x2 JOIN smallTable2 y2 ON (x2.key1 = y2.key1)) tmp1
{\code}
With this patch, we will just need a single Map-only job to evaluate it. Since 
we have two big tables (bigTable1 and bigTable2), in every Map task, only a 
single operation path will be initialized. Some tasks will process the MapJoin 
of bigTable1 and smallTable1, and some other tasks will process the MapJoin of 
bigTable2 and smallTable2. In this case, I think we will not increase the 
memory footprint of a Map task.

When we slightly change this query to ...
{code}
SELECT tmp1.key
FROM (SELECT x1.key2 AS key
      FROM bigTable1 x1 JOIN smallTable1 y1 ON (x1.key1 = y1.key1)
      UNION ALL
      SELECT x2.key2 AS key
      FROM bigTable1 x2 JOIN smallTable2 y2 ON (x2.key1 = y2.key1)) tmp1
{\code}
because we are using a single big table (bigTable1), we can have multiple 
aliases associated to a single input path. In this case, multiple 
TableScanOperators will be connected to a single MapOperator because we will 
just scan the table once. Because we evaluate both the MapJoin of bigTable1 and 
smallTable1, and MapJoin of bigTable1 and smallTable2 in the same Map task, 
this task has a higher memory footprint than the Map task which only process a 
single MapJoin (because we will load more Hash Tables). I think we still want 
to use a single Map-only job to evaluate this query because we can have shared 
scan, and we do not need to materialize intermediate results and load them back 
again just for UNION ALL. Seems HIVE-3996 is related. I will check it and add 
the logic to enforce the memory limit when we have shared scan.



                
> Merge a Map-only job to its following MapReduce job with multiple inputs
> ------------------------------------------------------------------------
>
>                 Key: HIVE-4827
>                 URL: https://issues.apache.org/jira/browse/HIVE-4827
>             Project: Hive
>          Issue Type: Improvement
>          Components: Query Processor
>    Affects Versions: 0.12.0
>            Reporter: Yin Huai
>            Assignee: Yin Huai
>         Attachments: HIVE-4827.1.patch
>
>
> When hive.optimize.mapjoin.mapreduce is on, CommonJoinResolver can attach a 
> Map-only job (MapJoin) to its following MapReduce job. But this merge only 
> happens when the MapReduce job has a single input. With Correlation Optimizer 
> (HIVE-2206), it is possible that the MapReduce job can have multiple inputs 
> (for multiple operation paths). It is desired to improve CommonJoinResolver 
> to merge a Map-only job to the corresponding Map task of the MapReduce job.
> Example:
> {code:sql}
> set hive.optimize.correlation=true;
> set hive.auto.convert.join=true;
> set hive.optimize.mapjoin.mapreduce=true;
> SELECT tmp1.key, count(*)
> FROM (SELECT x1.key1 AS key
>       FROM bigTable1 x1 JOIN smallTable1 y1 ON (x1.key1 = y1.key1)
>       GROUP BY x1.key1) tmp1
> JOIN (SELECT x2.key2 AS key
>       FROM bigTable2 x2 JOIN smallTable2 y2 ON (x2.key2 = y2.key2)
>       GROUP BY x2.key2) tmp2
> ON (tmp1.key = tmp2.key)
> GROUP BY tmp1.key;
> {\code}
> In this query, join operations inside tmp1 and tmp2 will be converted to two 
> MapJoins. With Correlation Optimizer, aggregations in tmp1, tmp2, and join of 
> tmp1 and tmp2, and the last aggregation will be executed in the same 
> MapReduce job (Reduce side). Since this MapReduce job has two inputs, right 
> now, CommonJoinResolver cannot attach two MapJoins to the Map side of a 
> MapReduce job.
> Another example:
> {code:sql}
> SELECT tmp1.key
> FROM (SELECT x1.key2 AS key
>       FROM bigTable1 x1 JOIN smallTable1 y1 ON (x1.key1 = y1.key1)
>       UNION ALL
>       SELECT x2.key2 AS key
>       FROM bigTable2 x2 JOIN smallTable2 y2 ON (x2.key1 = y2.key1)) tmp1
> {\code}
> For this case, we will have three Map-only jobs (two for MapJoins and one for 
> Union). It will be good to use a single Map-only job to execute this query.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to