[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Binzi Cao updated SPARK-20760: ------------------------------ Description: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: {code} name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} {code} To reproduce it: Just {code} spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar {code} was: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > Memory Leak of RDD blocks > -------------------------- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager > Affects Versions: 2.1.0 > Environment: Spark 2.1.0 > Reporter: Binzi Cao > Priority: Critical > > Memory leak for RDD blocks for a long time running rdd process. > I have a long term running application, which is doing computations of RDDs. > and I found the RDD blocks are keep increasing in the spark ui page. The rdd > blocks and memory usage does not mach the cached rdds and memory. It looks > like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. > The below is the minimized code and it is reproducible by justing running it > in local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org