Ohad Raviv created SPARK-25963: ---------------------------------- Summary: Optimize generate followed by window Key: SPARK-25963 URL: https://issues.apache.org/jira/browse/SPARK-25963 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0, 2.4.0 Reporter: Ohad Raviv
We've noticed that for our use-cases when we have explode followed by a window function we can almost always optimize it by adding repartition by the windows' partition before the explode. for example: {code:java} import org.apache.spark.sql.functions._ val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val tokens = spark.range(N).selectExpr( "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url") // .repartition("cust_id") .selectExpr("*", "explode(split(url, '/')) as token") import org.apache.spark.sql.expressions._ val w = Window.partitionBy("key", "token") val res = tokens.withColumn("cnt", count("token").over(w)) res.explain(true) {code} {noformat} == Optimized Logical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11] +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- Range (0, 4096, step=1, splits=Some(1)) {noformat} currently all the data will be exploded in the first stage, then shuffled and then aggregated. we can achieve exactly the same computation if we first shuffle the data and in the second stage explode and aggregate. I have a PR that tries to resolve this. I'm just not sure I thought about all the cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org