[ 
https://issues.apache.org/jira/browse/SPARK-40626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-40626:
--------------------------------
    Description: 
{code:scala}
sql("CREATE TABLE t1 (itemid BIGINT, eventType STRING, dt STRING) USING parquet 
PARTITIONED BY (dt)")
sql("CREATE TABLE t2 (cal_dt DATE, item_id BIGINT) using parquet")

sql("set spark.sql.autoBroadcastJoinThreshold=-1")

sql(
  """
    |SELECT itemid,
    |       eventtype
    |FROM   t1 a
    |       INNER JOIN (SELECT DISTINCT cal_dt,
    |                                   item_id
    |                   FROM   t2) b
    |               ON a.itemid = b.item_id
    |                  AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt
  """.stripMargin).explain()
{code}

The plan:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [itemid#10L, eventtype#11]
   +- SortMergeJoin [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
Some(America/Los_Angeles), false) as date), itemid#10L], [cal_dt#13, 
item_id#14L], Inner
      :- Sort [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, itemid#10L ASC 
NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cast(gettimestamp(dt#12, yyyyMMdd, 
TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L, 5), 
ENSURE_REQUIREMENTS, [plan_id=48]
      :     +- Filter isnotnull(itemid#10L)
      :        +- FileScan parquet 
spark_catalog.default.t1[itemid#10L,eventType#11,dt#12]
      +- Sort [cal_dt#13 ASC NULLS FIRST, item_id#14L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
            +- Exchange hashpartitioning(cal_dt#13, item_id#14L, 5), 
ENSURE_REQUIREMENTS, [plan_id=44]
               +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
                  +- Filter (isnotnull(item_id#14L) AND isnotnull(cal_dt#13))
                     +- FileScan parquet 
spark_catalog.default.t2[cal_dt#13,item_id#14L]
{noformat}

Default: 38min.
Do not reorder join keys: 8.1 min
Pull out complex join condition: 3.6 min


  was:

{code:scala}
sql("CREATE TABLE t1 (itemid BIGINT, eventType STRING, dt STRING) USING parquet 
PARTITIONED BY (dt)")
sql("CREATE TABLE t2 (cal_dt DATE, item_id BIGINT) using parquet")

sql("set spark.sql.autoBroadcastJoinThreshold=-1")

sql(
  """
    |SELECT itemid,
    |       eventtype
    |FROM   t1 a
    |       INNER JOIN (SELECT DISTINCT cal_dt,
    |                                   item_id
    |                   FROM   t2) b
    |               ON a.itemid = b.item_id
    |                  AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt
  """.stripMargin).explain()
{code}

The plan:

{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [itemid#10L, eventtype#11]
   +- SortMergeJoin [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
Some(America/Los_Angeles), false) as date), itemid#10L], [cal_dt#13, 
item_id#14L], Inner
      :- Sort [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, itemid#10L ASC 
NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cast(gettimestamp(dt#12, yyyyMMdd, 
TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L, 5), 
ENSURE_REQUIREMENTS, [plan_id=48]
      :     +- Filter isnotnull(itemid#10L)
      :        +- FileScan parquet 
spark_catalog.default.t1[itemid#10L,eventType#11,dt#12]
      +- Sort [cal_dt#13 ASC NULLS FIRST, item_id#14L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
            +- Exchange hashpartitioning(cal_dt#13, item_id#14L, 5), 
ENSURE_REQUIREMENTS, [plan_id=44]
               +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
                  +- Filter (isnotnull(item_id#14L) AND isnotnull(cal_dt#13))
                     +- FileScan parquet 
spark_catalog.default.t2[cal_dt#13,item_id#14L]
{noformat}




> Reorder join keys impact performance
> ------------------------------------
>
>                 Key: SPARK-40626
>                 URL: https://issues.apache.org/jira/browse/SPARK-40626
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Yuming Wang
>            Priority: Major
>         Attachments: Pull out complex join condition.png, change the 
> aggregate key order.png, default(join order will change).png
>
>
> {code:scala}
> sql("CREATE TABLE t1 (itemid BIGINT, eventType STRING, dt STRING) USING 
> parquet PARTITIONED BY (dt)")
> sql("CREATE TABLE t2 (cal_dt DATE, item_id BIGINT) using parquet")
> sql("set spark.sql.autoBroadcastJoinThreshold=-1")
> sql(
>   """
>     |SELECT itemid,
>     |       eventtype
>     |FROM   t1 a
>     |       INNER JOIN (SELECT DISTINCT cal_dt,
>     |                                   item_id
>     |                   FROM   t2) b
>     |               ON a.itemid = b.item_id
>     |                  AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt
>   """.stripMargin).explain()
> {code}
> The plan:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [itemid#10L, eventtype#11]
>    +- SortMergeJoin [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
> Some(America/Los_Angeles), false) as date), itemid#10L], [cal_dt#13, 
> item_id#14L], Inner
>       :- Sort [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, 
> Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, itemid#10L ASC 
> NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(cast(gettimestamp(dt#12, yyyyMMdd, 
> TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L, 5), 
> ENSURE_REQUIREMENTS, [plan_id=48]
>       :     +- Filter isnotnull(itemid#10L)
>       :        +- FileScan parquet 
> spark_catalog.default.t1[itemid#10L,eventType#11,dt#12]
>       +- Sort [cal_dt#13 ASC NULLS FIRST, item_id#14L ASC NULLS FIRST], 
> false, 0
>          +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
>             +- Exchange hashpartitioning(cal_dt#13, item_id#14L, 5), 
> ENSURE_REQUIREMENTS, [plan_id=44]
>                +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
>                   +- Filter (isnotnull(item_id#14L) AND isnotnull(cal_dt#13))
>                      +- FileScan parquet 
> spark_catalog.default.t2[cal_dt#13,item_id#14L]
> {noformat}
> Default: 38min.
> Do not reorder join keys: 8.1 min
> Pull out complex join condition: 3.6 min



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to