Hi, I posted this to users, but didn’t get any responses. I just wanted to highlight what seems like excessive memory use when using sliding windows. I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.
Regards,
-JD
--------------
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
/**
* A Small Unit Test to demonstrate Spark Window Functions OOM
*/
class SparkTest {
@Test
def testWindows() {
val sparkSession =
SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
import sparkSession.implicits._
println("Init Dataset")
val partitions = (0 until 4)
val entries = (0 until 6500)
//val windows = (5 to 15 by 5) //Works
val windows = (5 to 65 by 5) //OOM 10G
val testData = new ArrayBuffer[(String,Timestamp,Double)]
for( p <- partitions) {
for( e <- entries ) yield {
testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
}
}
val ds = testData.toDF("key","datetime","value")
ds.show()
var resultFrame = ds
resultFrame.schema.fields.foreach(println)
val baseWin = Window.partitionBy("key").orderBy("datetime")
for( win <- windows ) {
resultFrame =
resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
}
resultFrame.show()
}
}
signature.asc
Description: Message signed with OpenPGP using GPGMail
