Github user uzadude commented on the issue: https://github.com/apache/spark/pull/22964 this is the original query. we can see the explode followed by the shuffle: ``` import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val tokens = spark.range(0, N, 1, 2).selectExpr( "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url") .selectExpr("*", "explode(split(url, '/')) as token") val w = Window.partitionBy("key", "token") val res = tokens.withColumn("cnt", count("token").over(w)) res.explain(true) == Optimized Logical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11] +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- Range (0, 4096, step=1, splits=Some(2)) == Physical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key#6L, token#11, 1) +- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7], false, [token#11] +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- *(1) Range (0, 4096, step=1, splits=2) ``` and if we add the repartition before the generate: ``` val tokens = spark.range(0, N, 1, 2).selectExpr( "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url") .repartition(col("key")) .selectExpr("*", "explode(split(url, '/')) as token") val w = Window.partitionBy("key", "token") val res = tokens.withColumn("cnt", count("token").over(w)) res.explain(true) == Optimized Logical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11] +- RepartitionByExpression [key#6L], 1 +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- Range (0, 4096, step=1, splits=Some(2)) == Physical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0 +- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7], false, [token#11] +- Exchange hashpartitioning(key#6L, 1) +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- *(1) Range (0, 4096, step=1, splits=2) ``` we get the same result just with the explode after the shuffle, and only one shuffle
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org