[ 
https://issues.apache.org/jira/browse/SPARK-31602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095122#comment-17095122
 ] 

angerszhu commented on SPARK-31602:
-----------------------------------

In HadoopRDD , if you don't set  spark.hadoop.cloneConf=true, it will put new 
JobConf to cached metadata

and won't remove, maybe we should add a clear method?
{code:java}
// Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
protected def getJobConf(): JobConf = {
  val conf: Configuration = broadcastedConf.value.value
  if (shouldCloneJobConf) {
    // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
    // one job modifies a configuration while another reads it (SPARK-2546).  
This problem occurs
    // somewhat rarely because most jobs treat the configuration as though it's 
immutable.  One
    // solution, implemented here, is to clone the Configuration object.  
Unfortunately, this
    // clone can be very expensive.  To avoid unexpected performance 
regressions for workloads and
    // Hadoop versions that do not suffer from these thread-safety issues, this 
cloning is
    // disabled by default.
    HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
      logDebug("Cloning Hadoop Configuration")
      val newJobConf = new JobConf(conf)
      if (!conf.isInstanceOf[JobConf]) {
        initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
      }
      newJobConf
    }
  } else {
    if (conf.isInstanceOf[JobConf]) {
      logDebug("Re-using user-broadcasted JobConf")
      conf.asInstanceOf[JobConf]
    } else {
      Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
        .map { conf =>
          logDebug("Re-using cached JobConf")
          conf.asInstanceOf[JobConf]
        }
        .getOrElse {
          // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in
          // the local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
          // The caching helps minimize GC, since a JobConf can contain ~10KB 
of temporary
          // objects. Synchronize to prevent ConcurrentModificationException 
(SPARK-1097,
          // HADOOP-10456).
          HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
            logDebug("Creating new JobConf and caching it for later re-use")
            val newJobConf = new JobConf(conf)
            initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
            HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
            newJobConf
        }
      }
    }
  }
}
{code}
No remove for this cached Job metadata
{code:java}
/**
 * The three methods below are helpers for accessing the local map, a property 
of the SparkEnv of
 * the local process.
 */
def getCachedMetadata(key: String): Any = 
SparkEnv.get.hadoopJobMetadata.get(key)

private def putCachedMetadata(key: String, value: Any): Unit =
  SparkEnv.get.hadoopJobMetadata.put(key, value)

{code}
for SQL on hive data, each partition will generate one JobConf, it's heave.

 

> memory leak of JobConf
> ----------------------
>
>                 Key: SPARK-31602
>                 URL: https://issues.apache.org/jira/browse/SPARK-31602
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: angerszhu
>            Priority: Major
>         Attachments: image-2020-04-29-14-34-39-496.png
>
>
> !image-2020-04-29-14-30-46-213.png!
>  
> !image-2020-04-29-14-30-55-964.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to