[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-20760: ------------------------------ Flags: (was: Important) Priority: Major (was: Critical) I see something similar. The RDDs are unpersisted but seem to leave blocks around. If you remove the Future/Await part it doesn't occur. There might be some race condition where multiple threads cause blocks to be cached several times but only some are freed later. > Memory Leak of RDD blocks > -------------------------- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager > Affects Versions: 2.1.0 > Environment: Spark 2.1.0 > Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org