Github user uzadude commented on the issue:

    https://github.com/apache/spark/pull/22964
  
    this is the original query. we can see the explode followed by the shuffle:
    
    ```
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions._
    
    val N = 1 << 12
    
    spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
    
    val tokens = spark.range(0, N, 1, 2).selectExpr(
      "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
      .selectExpr("*", "explode(split(url, '/')) as token")
    
    val w = Window.partitionBy("key", "token")
    val res = tokens.withColumn("cnt", count("token").over(w))
    
    res.explain(true)
    
    
    == Optimized Logical Plan ==
    Window [count(token#11) windowspecdefinition(key#6L, token#11, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS cnt#17L], [key#6L, token#11]
    +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
       +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, 
asd/asd/asd/asd/asd/asd AS url#7]
          +- Range (0, 4096, step=1, splits=Some(2))
    
    == Physical Plan ==
    Window [count(token#11) windowspecdefinition(key#6L, token#11, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS cnt#17L], [key#6L, token#11]
    +- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(key#6L, token#11, 1)
          +- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7], 
false, [token#11]
             +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, 
asd/asd/asd/asd/asd/asd AS url#7]
                +- *(1) Range (0, 4096, step=1, splits=2)
    ```
    
    and if we add the repartition before the generate:
    ```
    val tokens = spark.range(0, N, 1, 2).selectExpr(
      "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
      .repartition(col("key"))
      .selectExpr("*", "explode(split(url, '/')) as token")
    
    val w = Window.partitionBy("key", "token")
    val res = tokens.withColumn("cnt", count("token").over(w))
    
    res.explain(true)
    
    == Optimized Logical Plan ==
    Window [count(token#11) windowspecdefinition(key#6L, token#11, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS cnt#17L], [key#6L, token#11]
    +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
       +- RepartitionByExpression [key#6L], 1
          +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, 
asd/asd/asd/asd/asd/asd AS url#7]
             +- Range (0, 4096, step=1, splits=Some(2))
    
    == Physical Plan ==
    Window [count(token#11) windowspecdefinition(key#6L, token#11, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS cnt#17L], [key#6L, token#11]
    +- *(2) Sort [key#6L ASC NULLS FIRST, token#11 ASC NULLS FIRST], false, 0
       +- Generate explode([asd,asd,asd,asd,asd,asd]), [key#6L, url#7], false, 
[token#11]
          +- Exchange hashpartitioning(key#6L, 1)
             +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, 
asd/asd/asd/asd/asd/asd AS url#7]
                +- *(1) Range (0, 4096, step=1, splits=2)
    ```
    we get the same result just with the explode after the shuffle, and only 
one shuffle



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to