Github user uzadude commented on the issue:
https://github.com/apache/spark/pull/22964
this is the original query. we can see the explode followed by the shuffle:
```
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val N = 1 << 12
spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
val tokens = spark.range(0, N, 1, 2).selectExpr(
"floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
.selectExpr("*", "explode(split(url, '/')) as token")
val w = Window.partitionBy("key", "token")
val res = tokens.withColumn("cnt", count("token").over(w))
res.explain(true)
== Optimized Logical Plan ==
Window [count(token#11) windowspecdefinition(key#6L, token#11,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS cnt#17L], [key#6L, token#11]
+- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
+- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L,
asd/asd/asd/asd/asd/asd AS url#7]
+- Range (0, 4096, step=1, splits=Some(2))
== Physical Plan ==
Window [count(token#11) windowspecdefinition(key#6L, token#11,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS cnt#17L], [key#6L, token#11]
+- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#6L, token#11, 1)
+- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7],
false, [token#11]
+- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L,
asd/asd/asd/asd/asd/asd AS url#7]
+- *(1) Range (0, 4096, step=1, splits=2)
```
and if we add the repartition before the generate:
```
val tokens = spark.range(0, N, 1, 2).selectExpr(
"floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
.repartition(col("key"))
.selectExpr("*", "explode(split(url, '/')) as token")
val w = Window.partitionBy("key", "token")
val res = tokens.withColumn("cnt", count("token").over(w))
res.explain(true)
== Optimized Logical Plan ==
Window [count(token#11) windowspecdefinition(key#6L, token#11,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS cnt#17L], [key#6L, token#11]
+- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
+- RepartitionByExpression [key#6L], 1
+- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L,
asd/asd/asd/asd/asd/asd AS url#7]
+- Range (0, 4096, step=1, splits=Some(2))
== Physical Plan ==
Window [count(token#11) windowspecdefinition(key#6L, token#11,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS cnt#17L], [key#6L, token#11]
+- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0
+- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7], false,
[token#11]
+- Exchange hashpartitioning(key#6L, 1)
+- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L,
asd/asd/asd/asd/asd/asd AS url#7]
+- *(1) Range (0, 4096, step=1, splits=2)
```
we get the same result just with the explode after the shuffle, and only
one shuffle
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org