Florentino Sainz created SPARK-29265: ----------------------------------------
Summary: Window orderBy causing full-DF orderBy Key: SPARK-29265 URL: https://issues.apache.org/jira/browse/SPARK-29265 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.4, 2.4.3, 2.3.0 Environment: Any Reporter: Florentino Sainz Hi, I had this problem in "real" environments and also made a self-contained test (attached). Having this Window definition: {code:java} val myWindow = Window.partitionBy($"word").orderBy("word") val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow)){code} As a user, I would expect either: 1- Error/warning (because trying to sort on one of the columns of the window partitionBy) 2- A mostly-useless operation which just orders the rows inside each Window but doesn't affect performance too much. Currently what I see: *When I use "myWindow" in any DataFrame, somehow that Window.orderBy is performing a global orderBy of the whole DataFrame. Similar to dataframe.orderBy("word").* *In my real environment, my program just didn't finish in time/crashed thus causing my program to be very slow or crash (because as it's a global orderBy, it will just go to one executor).* In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy) Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) <-- You can pase it in Intellij/Any other and it should work: {quote} import java.io.ByteArrayOutputStream import java.net.URL import java.nio.charset.Charset import org.apache.commons.io.IOUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.types.\{IntegerType, StringType, StructField, StructType} import scala.collection.mutable object Test { case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer) def main(args: Array[String]): Unit = { val spark = SparkSession.builder .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.autoBroadcastJoinThreshold", -1) .master("local[4]") .appName("Word Count") .getOrCreate() import org.apache.spark.sql.functions._ import spark.implicits._ val sc = spark.sparkContext val expectedSchema = List( StructField("number", IntegerType, false), StructField("word", StringType, false), StructField("dummyColumn", StringType, false) ) val expectedData = Seq( Row(8, "bat", "test"), Row(64, "mouse", "test"), Row(-27, "horse", "test") ) val filtrador = spark.createDataFrame( spark.sparkContext.parallelize(expectedData), StructType(expectedSchema) ).withColumn("dummy", explode(array((1 until 100).map(lit): _*))) val myWindow = Window.partitionBy($"word").orderBy("word") val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow)) filt2.show filt2.rdd.mapPartitions(iter => Iterator(iter.size), true).collect().foreach(println) } } {quote} {code:java} {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org