I ran it on Databricks community edition which was a local[8] cluster with 6GB of RAM. It ran fine.
That said, looking at the plan, we can definitely simplify this quite a bit. We had a new Window physical execution node for each window expression, when we could have collapsed all of them into a single one. On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <jerda...@speakeasy.net> wrote: > > Hi, I posted this to users, but didn’t get any responses. > I just wanted to highlight what seems like excessive memory use when using > sliding windows. > I have attached a test case where starting with certainly less than 1MB of > data I can OOM a 10G heap. > > Regards, > -JD > > > > -------------- > > > import java.sql.Timestamp > > import org.apache.spark.sql.SparkSession > import org.junit.Test > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql.functions._ > > import scala.collection.mutable.ArrayBuffer > > > /** > * A Small Unit Test to demonstrate Spark Window Functions OOM > */ > class SparkTest { > > > @Test > def testWindows() { > val sparkSession = > SparkSession.builder().master("local[7]").appName("tests").getOrCreate() > import sparkSession.implicits._ > > println("Init Dataset") > > val partitions = (0 until 4) > val entries = (0 until 6500) > > //val windows = (5 to 15 by 5) //Works > val windows = (5 to 65 by 5) //OOM 10G > > val testData = new ArrayBuffer[(String,Timestamp,Double)] > > > for( p <- partitions) { > for( e <- entries ) yield { > testData += (("Key"+p,new Timestamp(60000*e),e*2.0)) > } > } > > val ds = testData.toDF("key","datetime","value") > ds.show() > > > var resultFrame = ds > resultFrame.schema.fields.foreach(println) > > > val baseWin = Window.partitionBy("key").orderBy("datetime") > for( win <- windows ) { > resultFrame = > resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0))) > > .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0))) > > .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0))) > > .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0))) > } > resultFrame.show() > > } > > } > > > >