[ https://issues.apache.org/jira/browse/SPARK-40626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-40626: -------------------------------- Attachment: Pull out complex join condition and infer more filters.png > Reorder join keys impact performance > ------------------------------------ > > Key: SPARK-40626 > URL: https://issues.apache.org/jira/browse/SPARK-40626 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.4.0 > Reporter: Yuming Wang > Priority: Major > Attachments: Pull out complex join condition and infer more > filters.png, Pull out complex join condition.png, change the aggregate key > order.png, default(join order will change).png > > > {code:scala} > sql("CREATE TABLE t1 (itemid BIGINT, eventType STRING, dt STRING) USING > parquet PARTITIONED BY (dt)") > sql("CREATE TABLE t2 (cal_dt DATE, item_id BIGINT) using parquet") > sql("set spark.sql.autoBroadcastJoinThreshold=-1") > sql( > """ > |SELECT itemid, > | eventtype > |FROM t1 a > | INNER JOIN (SELECT DISTINCT cal_dt, > | item_id > | FROM t2) b > | ON a.itemid = b.item_id > | AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt > """.stripMargin).explain() > {code} > The plan: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [itemid#10L, eventtype#11] > +- SortMergeJoin [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, > Some(America/Los_Angeles), false) as date), itemid#10L], [cal_dt#13, > item_id#14L], Inner > :- Sort [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, > Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, itemid#10L ASC > NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(cast(gettimestamp(dt#12, yyyyMMdd, > TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L, 5), > ENSURE_REQUIREMENTS, [plan_id=48] > : +- Filter isnotnull(itemid#10L) > : +- FileScan parquet > spark_catalog.default.t1[itemid#10L,eventType#11,dt#12] > +- Sort [cal_dt#13 ASC NULLS FIRST, item_id#14L ASC NULLS FIRST], > false, 0 > +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[]) > +- Exchange hashpartitioning(cal_dt#13, item_id#14L, 5), > ENSURE_REQUIREMENTS, [plan_id=44] > +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[]) > +- Filter (isnotnull(item_id#14L) AND isnotnull(cal_dt#13)) > +- FileScan parquet > spark_catalog.default.t2[cal_dt#13,item_id#14L] > {noformat} > Default: 38min. > Do not reorder join keys: 8.1 min > Pull out complex join condition: 3.6 min -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org