[ https://issues.apache.org/jira/browse/SPARK-44900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17759130#comment-17759130 ]
Yauheni Audzeichyk edited comment on SPARK-44900 at 8/25/23 6:38 PM: --------------------------------------------------------------------- [~yao] we tried spark.cleaner.periodicGC.interval=1min but it didn't help. Here are my observations: * this happens even in a very simple scenario - see the example to reproduce below * it happens after join * uncontrollable growth of disk usage starts only when any portion of RDD got spilled to disk * if cached RDD remains 100% in memory this issue doesn't happen * when an executor dies then "Size on Disk" on Storage tab gets reduced by the amount of storage blocks held by that dead executor (makes sense) It looks like some storage block (shuffle blocks?) are being tracked under that cached RDD and never (or at least not in a reasonable time) released until the executor dies. Our worry is whether it is just disk size usage tracking bug or those blocks are actually kept on the disk because our production job disk usage (per Spark UI) grew by 6TB in a span of 10 hours. Here's the code to reproduce: {code:java} val conf = new SparkConf().set("spark.master", "yarn") val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(10)) // create a pseudo stream val rddQueue = new mutable.Queue[RDD[Long]]() val stream = ssc.queueStream(rddQueue, oneAtATime = true) // create a simple lookup table val lookup = sc.range(start = 0, end = 50000000, numSlices = 10) .toDF("id") .withColumn("value", md5(rand().cast(StringType))) .cache() // for every micro-batch perform value lookup via join stream.foreachRDD { rdd => val df = rdd.toDF("id") df.join(lookup, Seq("id"), "leftouter").count() } // run the streaming ssc.start() for (_ <- 1 to 1000000) { rddQueue.synchronized { val firstId = Random.nextInt(50000000) rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4) } Thread.sleep(10) } ssc.stop() {code} Submit parameters (selected to create storage memory deficit and cause cache to be spilled): {code:java} --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g \ --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10 {code} When executed, disk usage of that cached lookup DF grows really fast. Same thing happens in Spark 2.4 and 3.3 was (Author: JIRAUSER301989): [~yao] we tried spark.cleaner.periodicGC.interval=1min but it didn't help. Here are my observations: * this happens even in a very simple scenario - see the example to reproduce below * it happens after join * uncontrollable growth of disk usage starts only when any portion of RDD got spilled to disk * if cached RDD remains 100% in memory this issue doesn't happen * when an executor dies then "Size on Disk" on Storage tab gets reduced by the amount of storage blocks held by that dead executor (makes sense) It looks like some storage block (shuffle blocks?) are being tracked under that cached RDD and never (or at least not in a reasonable time) released until the executor dies. Our worry is whether it is just disk size usage tracking bug or those blocks are actually kept on the disk because our production job disk usage (per Spark UI) grew by 6TB in a span of 10 hours. Here's the code to reproduce: {code:java} val conf = new SparkConf().set("spark.master", "yarn") val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(10)) // create a pseudo stream val rddQueue = new mutable.Queue[RDD[Long]]() val stream = ssc.queueStream(rddQueue, oneAtATime = true) // create a simple lookup table val lookup = sc.range(start = 0, end = 50000000, numSlices = 10) .toDF("id") .withColumn("value", md5(rand().cast(StringType))) .cache() // for every micro-batch perform value lookup via join stream.foreachRDD { rdd => val df = rdd.toDF("id") df.join(lookup, Seq("id"), "leftouter").count() } // run the streaming ssc.start() for (_ <- 1 to 1000000) { rddQueue.synchronized { val firstId = Random.nextInt(50000000) rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4) } Thread.sleep(10) } ssc.stop() {code} Submit parameters (selected to create storage memory deficit and cause cache to be spilled): {code:java} --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g \ --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10 {code} When executed, disk usage of that cached lookup DF grows really fast. > Cached DataFrame keeps growing > ------------------------------ > > Key: SPARK-44900 > URL: https://issues.apache.org/jira/browse/SPARK-44900 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.0 > Reporter: Varun Nalla > Priority: Blocker > > Scenario : > We have a kafka streaming application where the data lookups are happening by > joining another DF which is cached, and the caching strategy is > MEMORY_AND_DISK. > However the size of the cached DataFrame keeps on growing for every micro > batch the streaming application process and that's being visible under > storage tab. > A similar stack overflow thread was already raised. > https://stackoverflow.com/questions/55601779/spark-dataframe-cache-keeps-growing -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org