[ https://issues.apache.org/jira/browse/SPARK-31602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095122#comment-17095122 ]
angerszhu commented on SPARK-31602: ----------------------------------- In HadoopRDD , if you don't set spark.hadoop.cloneConf=true, it will put new JobConf to cached metadata and won't remove, maybe we should add a clear method? {code:java} // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (shouldCloneJobConf) { // Hadoop Configuration objects are not thread-safe, which may lead to various problems if // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs // somewhat rarely because most jobs treat the configuration as though it's immutable. One // solution, implemented here, is to clone the Configuration object. Unfortunately, this // clone can be very expensive. To avoid unexpected performance regressions for workloads and // Hadoop versions that do not suffer from these thread-safety issues, this cloning is // disabled by default. HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { logDebug("Cloning Hadoop Configuration") val newJobConf = new JobConf(conf) if (!conf.isInstanceOf[JobConf]) { initLocalJobConfFuncOpt.foreach(f => f(newJobConf)) } newJobConf } } else { if (conf.isInstanceOf[JobConf]) { logDebug("Re-using user-broadcasted JobConf") conf.asInstanceOf[JobConf] } else { Option(HadoopRDD.getCachedMetadata(jobConfCacheKey)) .map { conf => logDebug("Re-using cached JobConf") conf.asInstanceOf[JobConf] } .getOrElse { // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in // the local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary // objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097, // HADOOP-10456). HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { logDebug("Creating new JobConf and caching it for later re-use") val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.foreach(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) newJobConf } } } } } {code} No remove for this cached Job metadata {code:java} /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key) private def putCachedMetadata(key: String, value: Any): Unit = SparkEnv.get.hadoopJobMetadata.put(key, value) {code} for SQL on hive data, each partition will generate one JobConf, it's heave. > memory leak of JobConf > ---------------------- > > Key: SPARK-31602 > URL: https://issues.apache.org/jira/browse/SPARK-31602 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.0 > Reporter: angerszhu > Priority: Major > Attachments: image-2020-04-29-14-34-39-496.png > > > !image-2020-04-29-14-30-46-213.png! > > !image-2020-04-29-14-30-55-964.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org