Yi Zhou created SPARK-15219:
-------------------------------

             Summary: [Spark SQL] it don't support to detect runtime temporary 
table for enabling broadcast hash join optimization
                 Key: SPARK-15219
                 URL: https://issues.apache.org/jira/browse/SPARK-15219
             Project: Spark
          Issue Type: Improvement
          Components: SQL
            Reporter: Yi Zhou


We observed an interesting thing about broadcast Hash join( similar to Map Join 
in Hive) when comparing the implementation by Hive on MR engine. The blew query 
is a multi-way join operation based on 3 tables including product_reviews, 2 
run-time temporary result tables(fsr and fwr) from ‘select’ query operation and 
also there is a two-way join(1 table and 1 run-time temporary table) in both 
'fsr' and 'fwr'. We investigated the difference between Spark SQL and Hive on 
MR engine and found that there are total of 5 map join tasks with tuned map 
join parameters in Hive on MR but there are only 2 broadcast hash join tasks in 
Spark SQL even if we set a larger threshold(e.g.,1GB) for broadcast hash join. 
From our investigation, it seems that if there is run-time temporary table in 
join operation in Spark SQL engine it will not detect such table for enabling 
broadcast hash join optimization. 

Core SQL snippet:
{code}
INSERT INTO TABLE q19_spark_sql_power_test_0_result
SELECT *
FROM
( --wrap in additional FROM(), because Sorting/distribute by with UDTF in 
select clause is not allowed
  SELECT extract_sentiment(pr.pr_item_sk, pr.pr_review_content) AS
  (
    item_sk,
    review_sentence,
    sentiment,
    sentiment_word
  )
  FROM product_reviews pr,
  (
    --store returns in week ending given date
    SELECT sr_item_sk, SUM(sr_return_quantity) sr_item_qty
    FROM store_returns sr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) sr_dateFilter
    WHERE sr.sr_returned_date_sk = d_date_sk
    GROUP BY sr_item_sk --across all store and web channels
    HAVING sr_item_qty > 0
  ) fsr,
  (
    --web returns in week ending given date
    SELECT wr_item_sk, SUM(wr_return_quantity) wr_item_qty
    FROM web_returns wr,
    (
      -- within the week ending a given date
      SELECT d1.d_date_sk
      FROM date_dim d1, date_dim d2
      WHERE d1.d_week_seq = d2.d_week_seq
      AND d2.d_date IN ( '2004-03-8' ,'2004-08-02' ,'2004-11-15', '2004-12-20' )
    ) wr_dateFilter
    WHERE wr.wr_returned_date_sk = d_date_sk
    GROUP BY wr_item_sk  --across all store and web channels
    HAVING wr_item_qty > 0
  ) fwr
  WHERE fsr.sr_item_sk = fwr.wr_item_sk
  AND pr.pr_item_sk = fsr.sr_item_sk --extract product_reviews for found items
  -- equivalent across all store and web channels (within a tolerance of +/- 
10%)
  AND abs( (sr_item_qty-wr_item_qty)/ ((sr_item_qty+wr_item_qty)/2)) <= 0.1
)extractedSentiments
WHERE sentiment= 'NEG' --if there are any major negative reviews.
ORDER BY item_sk,review_sentence,sentiment,sentiment_word
;
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to