[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-06-06 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: Storage in spark 2.1.1.png
RDD blocks in spark 2.1.1.png

Spark 2.1.1 Test Result

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage 
> in spark 2.1.1.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-20760:
--
   Flags:   (was: Important)
Priority: Major  (was: Critical)

I see something similar. The RDDs are unpersisted but seem to leave blocks 
around. If you remove the Future/Await part it doesn't occur. There might be 
some race condition where multiple threads cause blocks to be cached several 
times but only some are freed later.

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

We  have a long term running application, which is doing computations of RDDs. 
and we found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage do not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
issue in Yarn Cluster mode both in kafka streaming and batch applications. The 
issue in streaming is similar, however, it seems the rdd blocks grows a bit 
slower than batch jobs. 

The below is the sample code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
{code}


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: RDD Blocks .png

RDD blocks are increasing crazily after running the app for a couple of hours, 
see the attached screen shot of the spark ui page

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: (was: Screen Shot 2017-05-16 at 1.47.06 pm.png)

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: Screen Shot 2017-05-16 at 1.47.06 pm.png

RDD blocks are growing crazily after running for a couple of hours

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
> Attachments: Screen Shot 2017-05-16 at 1.47.06 pm.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
{code}


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{{{
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

}}}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block 

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{{{
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

}}}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar


  was:
Memory lead for RDD blocks for a long time running rdd process. I have a long 
term running application, which is doing caculations of RDDs. and I found the 
RDD blocks are keep increasing. The rdd blocks and memory usage does not mach 
the cached rdds and memory. It looks like spark keeps old rdds in memory and 
never released it. In addtion, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>