[ https://issues.apache.org/jira/browse/SPARK-33302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-33302: ----------------------------------- Assignee: angerszhu > Failed to push down filters through Expand > ------------------------------------------ > > Key: SPARK-33302 > URL: https://issues.apache.org/jira/browse/SPARK-33302 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.4, 3.0.1, 3.1.0 > Reporter: Yuming Wang > Assignee: angerszhu > Priority: Major > > How to reproduce this issue: > {code:sql} > create table SPARK_33302_1(pid int, uid int, sid int, dt date, suid int) > using parquet; > create table SPARK_33302_2(pid int, vs int, uid int, csid int) using parquet; > SELECT > years, > appversion, > SUM(uusers) AS users > FROM (SELECT > Date_trunc('year', dt) AS years, > CASE > WHEN h.pid = 3 THEN 'iOS' > WHEN h.pid = 4 THEN 'Android' > ELSE 'Other' > END AS viewport, > h.vs AS appversion, > Count(DISTINCT u.uid) AS uusers > ,Count(DISTINCT u.suid) AS srcusers > FROM SPARK_33302_1 u > join SPARK_33302_2 h > ON h.uid = u.uid > GROUP BY 1, > 2, > 3) AS a > WHERE viewport = 'iOS' > GROUP BY 1, > 2 > {code} > {noformat} > == Physical Plan == > *(5) HashAggregate(keys=[years#30, appversion#32], > functions=[sum(uusers#33L)]) > +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251] > +- *(4) HashAggregate(keys=[years#30, appversion#32], > functions=[partial_sum(uusers#33L)]) > +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS > TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN > 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) > u.`uid`#47 else null)]) > +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS > TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN > 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246] > +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS > TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN > 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = > 1)) u.`uid`#47 else null)]) > +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS > TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN > 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], > functions=[]) > +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` > AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN > 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), > true, [id=#241] > +- *(2) HashAggregate(keys=[date_trunc('year', > CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN > (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, > u.`suid`#48, gid#44], functions=[]) > +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' > WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) > +- *(2) Expand [ArrayBuffer(date_trunc(year, > cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS > WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), > ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE > WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, > vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, > CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE > 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44] > +- *(2) Project [uid#7, dt#9, suid#10, pid#11, > vs#12] > +- *(2) BroadcastHashJoin [uid#7], [uid#13], > Inner, BuildRight > :- *(2) Project [uid#7, dt#9, suid#10] > : +- *(2) Filter isnotnull(uid#7) > : +- *(2) ColumnarToRow > : +- FileScan parquet > default.spark_33301_1[uid#7,dt#9,suid#10] Batched: true, DataFilters: > [isnotnull(uid#7)], Format: Parquet, Location: > InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/spark_33301_1], > PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: > struct<uid:int,dt:date,suid:int> > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), > [id=#233] > +- *(1) Project [pid#11, vs#12, uid#13] > +- *(1) Filter isnotnull(uid#13) > +- *(1) ColumnarToRow > +- FileScan parquet > default.spark_33301_2[pid#11,vs#12,uid#13] Batched: true, DataFilters: > [isnotnull(uid#13)], Format: Parquet, Location: > InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/spark_33301_2], > PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: > struct<pid:int,vs:int,uid:int> > {noformat} > We can push down {{Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = > 4) THEN 'Android' ELSE 'Other' END#46 = iOS)}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org