[ https://issues.apache.org/jira/browse/SPARK-17842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sreelal S L updated SPARK-17842: -------------------------------- Comment: was deleted (was: SparkStreaming’s Window internally uses UnionRDD to merge all the RDDs(created during each batch) in the window time frame. They have a logic isPartitionListingParallel: Boolean = rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) In our case , the rdd count per window is well above 10 , and hence it will use parallel partition computation. partitionEvalTaskSupport =new ForkJoinTaskSupport(new ForkJoinPool(8)) This pool is created every slide interval but ForkJoinPool.shutdown() is not called anywhere. This can be the reason for scala.concurrent.forkjoin.ForkJoinTask[] to leak . This is causing a thread leak. ) > Thread and memory leak in WindowDstream (UnionRDD ) when parallelPartition > computation gets enabled. > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-17842 > URL: https://issues.apache.org/jira/browse/SPARK-17842 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming > Affects Versions: 2.0.0 > Environment: Yarn cluster, Eclipse Dev Env > Reporter: Sreelal S L > Priority: Critical > > We noticed a steady increase in ForkJoinTask instances in the driver process > heap. Found out the root cause to be UnionRDD. > WindowDstream internally uses UnionRDD which has a parallel partition > computation logic by using parallel collection with ForkJoinPool task > support. > partitionEvalTaskSupport =new ForkJoinTaskSupport(new ForkJoinPool(8)) > The pool is created each time when a UnionRDD is created , but the pool is > not getting shutdown. This is leaking thread/mem every slide interval of the > window. > Easily reproducible with the below code. Just keep a watch on the number of > threads. > {code} > val sparkConf = new > SparkConf().setMaster("local[*]").setAppName("TestLeak") > val ssc = new StreamingContext(sparkConf, Seconds(1)) > ssc.checkpoint("checkpoint") > val rdd = ssc.sparkContext.parallelize(List(1,2,3)) > val constStream = new ConstantInputDStream[Int](ssc,rdd) > constStream.window(Seconds(20),Seconds(1)).print() > ssc.start() > ssc.awaitTermination(); > {code} > This happens only when the number of rdds to be unioned is above the value > spark.rdd.parallelListingThreshold (By default 10) > Currently i'm working around by setting this threshold be a higher value. > -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org