[ 
https://issues.apache.org/jira/browse/SPARK-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Zhou updated SPARK-15219:
----------------------------
    Description: 
We observed an interesting thing about broadcast Hash join( similar to Map Join 
in Hive) when comparing the implementation by Hive on MR engine. The blew query 
is a multi-way join operation based on 3 tables including product_reviews, 2 
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and 
also there is a two-way join(1 table and 1 run-time temporary table) in both 
'fsr' and 'fwr',which cause slower performance than Hive on MR. We investigated 
the difference between Spark SQL and Hive on MR engine and found that there are 
total of 5 map join tasks with tuned map join parameters in Hive on MR but 
there are only 2 broadcast hash join tasks in Spark SQL even if we set a larger 
threshold(e.g.,1GB) for broadcast hash join. From our investigation, it seems 
that if there is run-time temporary table in join operation in Spark SQL engine 
it will not detect such table for enabling broadcast hash join optimization. 

Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
select clause is not allowed
  SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
  (
    item_sk,
    review_sentence,
    sentiment,
    sentiment_word
  )
  FROM product_reviews pr,
  (
    --store returns in week ending given date
    SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
    FROM store_returns sr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) sr_dateFilter
    WHERE sr.sr_returned_date_sk = d_date_sk
    GROUP BY sr_item_sk --across all store and web channels
    HAVING sr_item_qty > 0
  ) fsr,
  (
    --web returns in week ending given date
    SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
    FROM web_returns wr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) wr_dateFilter
    WHERE wr.wr_returned_date_sk = d_date_sk
    GROUP BY wr_item_sk  --across all store and web channels
    HAVING wr_item_qty > 0
  ) fwr
  WHERE fsr.sr_item_sk = fwr.wr_item_sk
  AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
  -- equivalent across all store and web channels (within a tolerance of +/- 
10%)
  AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}

  was:
We observed an interesting thing about broadcast Hash join( similar to Map Join 
in Hive) when comparing the implementation by Hive on MR engine. The blew query 
is a multi-way join operation based on 3 tables including product_reviews, 2 
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and 
also there is a two-way join(1 table and 1 run-time temporary table) in both 
'fsr' and 'fwr'. We investigated the difference between Spark SQL and Hive on 
MR engine and found that there are total of 5 map join tasks with tuned map 
join parameters in Hive on MR but there are only 2 broadcast hash join tasks in 
Spark SQL even if we set a larger threshold(e.g.,1GB) for broadcast hash join. 
From our investigation, it seems that if there is run-time temporary table in 
join operation in Spark SQL engine it will not detect such table for enabling 
broadcast hash join optimization. 

Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
select clause is not allowed
  SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
  (
    item_sk,
    review_sentence,
    sentiment,
    sentiment_word
  )
  FROM product_reviews pr,
  (
    --store returns in week ending given date
    SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
    FROM store_returns sr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) sr_dateFilter
    WHERE sr.sr_returned_date_sk = d_date_sk
    GROUP BY sr_item_sk --across all store and web channels
    HAVING sr_item_qty > 0
  ) fsr,
  (
    --web returns in week ending given date
    SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
    FROM web_returns wr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) wr_dateFilter
    WHERE wr.wr_returned_date_sk = d_date_sk
    GROUP BY wr_item_sk  --across all store and web channels
    HAVING wr_item_qty > 0
  ) fwr
  WHERE fsr.sr_item_sk = fwr.wr_item_sk
  AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
  -- equivalent across all store and web channels (within a tolerance of +/- 
10%)
  AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}


> [Spark SQL] it don't support to detect runtime temporary table for enabling 
> broadcast hash join optimization
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15219
>                 URL: https://issues.apache.org/jira/browse/SPARK-15219
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Yi Zhou
>
> We observed an interesting thing about broadcast Hash join( similar to Map 
> Join in Hive) when comparing the implementation by Hive on MR engine. The 
> blew query is a multi-way join operation based on 3 tables including 
> product_reviews, 2 run-time temporary result tables(fsr and fwr) from 
> ‘select’ query operation and also there is a two-way join(1 table and 1 
> run-time temporary table) in both 'fsr' and 'fwr',which cause slower 
> performance than Hive on MR. We investigated the difference between Spark SQL 
> and Hive on MR engine and found that there are total of 5 map join tasks with 
> tuned map join parameters in Hive on MR but there are only 2 broadcast hash 
> join tasks in Spark SQL even if we set a larger threshold(e.g.,1GB) for 
> broadcast hash join. From our investigation, it seems that if there is 
> run-time temporary table in join operation in Spark SQL engine it will not 
> detect such table for enabling broadcast hash join optimization. 
> Core SQL snippet:
> {code}
> INSERT INTO TABLE q19_spark_sql_power_test_0_result
> SELECT *
> FROM
> ( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
> select clause is not allowed
>   SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
>   (
>     item_sk,
>     review_sentence,
>     sentiment,
>     sentiment_word
>   )
>   FROM product_reviews pr,
>   (
>     --store returns in week ending given date
>     SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
>     FROM store_returns sr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) sr_dateFilter
>     WHERE sr.sr_returned_date_sk = d_date_sk
>     GROUP BY sr_item_sk --across all store and web channels
>     HAVING sr_item_qty > 0
>   ) fsr,
>   (
>     --web returns in week ending given date
>     SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
>     FROM web_returns wr,
>     (
>       -- within the week ending a given date
>       SELECT d1.d_date_sk
>       FROM date_dim d1, date_dim d2
>       WHERE d1.d_week_seq = d2.d_week_seq
>       AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', 
> '2004-12-20' )
>     ) wr_dateFilter
>     WHERE wr.wr_returned_date_sk = d_date_sk
>     GROUP BY wr_item_sk  --across all store and web channels
>     HAVING wr_item_qty > 0
>   ) fwr
>   WHERE fsr.sr_item_sk = fwr.wr_item_sk
>   AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
>   -- equivalent across all store and web channels (within a tolerance of +/- 
> 10%)
>   AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
> )extractedSentiments
> WHERE sentiment= 'NEG' --if there are any major negative reviews.
> ORDER BY item_sk,review_sentence,sentiment,sentiment_word
> ;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to