[ 
https://issues.apache.org/jira/browse/SPARK-44900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17759130#comment-17759130
 ] 

Yauheni Audzeichyk edited comment on SPARK-44900 at 8/25/23 6:38 PM:
---------------------------------------------------------------------

[~yao] we tried spark.cleaner.periodicGC.interval=1min but it didn't help. 

Here are my observations:
 * this happens even in a very simple scenario - see the example to reproduce 
below
 * it happens after join
 * uncontrollable growth of disk usage starts only when any portion of RDD got 
spilled to disk
 * if cached RDD remains 100% in memory this issue doesn't happen
 * when an executor dies then "Size on Disk" on Storage tab gets reduced by the 
amount of storage blocks held by that dead executor (makes sense)

It looks like some storage block (shuffle blocks?) are being tracked under that 
cached RDD and never (or at least not in a reasonable time) released until the 
executor dies.

Our worry is whether it is just disk size usage tracking bug or those blocks 
are actually kept on the disk because our production job disk usage (per Spark 
UI) grew by 6TB in a span of 10 hours.

Here's the code to reproduce:
{code:java}
val conf = new SparkConf().set("spark.master", "yarn")
val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup = sc.range(start = 0, end = 50000000, numSlices = 10)
    .toDF("id")
    .withColumn("value", md5(rand().cast(StringType)))
    .cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
  val df = rdd.toDF("id")
  df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
  rddQueue.synchronized {
    val firstId = Random.nextInt(50000000)
    rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
  }
  Thread.sleep(10)
}
ssc.stop() {code}
Submit parameters (selected to create storage memory deficit and cause cache to 
be spilled):
{code:java}
--executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g 
\
--conf spark.dynamicAllocation.enabled=false --conf 
spark.sql.shuffle.partitions=10 {code}
When executed, disk usage of that cached lookup DF grows really fast.

Same thing happens in Spark 2.4 and 3.3


was (Author: JIRAUSER301989):
[~yao] we tried spark.cleaner.periodicGC.interval=1min but it didn't help. 

Here are my observations:
 * this happens even in a very simple scenario - see the example to reproduce 
below
 * it happens after join
 * uncontrollable growth of disk usage starts only when any portion of RDD got 
spilled to disk
 * if cached RDD remains 100% in memory this issue doesn't happen
 * when an executor dies then "Size on Disk" on Storage tab gets reduced by the 
amount of storage blocks held by that dead executor (makes sense)

It looks like some storage block (shuffle blocks?) are being tracked under that 
cached RDD and never (or at least not in a reasonable time) released until the 
executor dies.

Our worry is whether it is just disk size usage tracking bug or those blocks 
are actually kept on the disk because our production job disk usage (per Spark 
UI) grew by 6TB in a span of 10 hours.

Here's the code to reproduce:
{code:java}
val conf = new SparkConf().set("spark.master", "yarn")
val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup = sc.range(start = 0, end = 50000000, numSlices = 10)
    .toDF("id")
    .withColumn("value", md5(rand().cast(StringType)))
    .cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
  val df = rdd.toDF("id")
  df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
  rddQueue.synchronized {
    val firstId = Random.nextInt(50000000)
    rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
  }
  Thread.sleep(10)
}
ssc.stop() {code}
Submit parameters (selected to create storage memory deficit and cause cache to 
be spilled):
{code:java}
--executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g 
\
--conf spark.dynamicAllocation.enabled=false --conf 
spark.sql.shuffle.partitions=10 {code}
When executed, disk usage of that cached lookup DF grows really fast.

> Cached DataFrame keeps growing
> ------------------------------
>
>                 Key: SPARK-44900
>                 URL: https://issues.apache.org/jira/browse/SPARK-44900
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.0
>            Reporter: Varun Nalla
>            Priority: Blocker
>
> Scenario :
> We have a kafka streaming application where the data lookups are happening by 
> joining  another DF which is cached, and the caching strategy is 
> MEMORY_AND_DISK.
> However the size of the cached DataFrame keeps on growing for every micro 
> batch the streaming application process and that's being visible under 
> storage tab.
> A similar stack overflow thread was already raised.
> https://stackoverflow.com/questions/55601779/spark-dataframe-cache-keeps-growing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to