Repository: spark
Updated Branches:
  refs/heads/branch-1.1 327404d88 -> 2cd40db2b


[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)

This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`.  The 
underlying problem is that thread-safety issues in Hadoop Configuration objects 
may cause Spark tasks to get stuck in infinite loops.  The approach taken here 
is to clone a new copy of the JobConf for each task rather than sharing a 
single copy between tasks.  Note that there are still Configuration 
thread-safety issues that may affect the driver, but these seem much less 
likely to occur in practice and will be more complex to fix (see discussion on 
the SPARK-2546 ticket).

This cloning is guarded by a new configuration option 
(`spark.hadoop.cloneConf`) and is disabled by default in order to avoid 
unexpected performance regressions for workloads that are unaffected by the 
Configuration thread-safety issues.

Author: Josh Rosen <joshro...@apache.org>

Closes #2684 from JoshRosen/jobconf-fix-backport and squashes the following 
commits:

f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop 
JobConf.
b562451 [Josh Rosen] Remove unused jobConfCacheKey field.
dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each 
task.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cd40db2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cd40db2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cd40db2

Branch: refs/heads/branch-1.1
Commit: 2cd40db2b3ab5ddcb323fd05c171dbd9025f9e71
Parents: 327404d
Author: Josh Rosen <joshro...@apache.org>
Authored: Sun Oct 19 00:31:06 2014 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Sun Oct 19 00:31:06 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 53 ++++++++++++++------
 docs/configuration.md                           |  9 ++++
 2 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2cd40db2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index c862331..e3d6c5f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -129,27 +129,47 @@ class HadoopRDD[K, V](
   // used to build JobTracker ID
   private val createTime = new Date()
 
+  private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", 
"false").toBoolean
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
-    if (conf.isInstanceOf[JobConf]) {
-      // A user-broadcasted JobConf was provided to the HadoopRDD, so always 
use it.
-      conf.asInstanceOf[JobConf]
-    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
-      // needed by this RDD.
-      HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
-    } else {
-      // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
-      // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
-      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
-      // Synchronize to prevent ConcurrentModificationException (Spark-1097, 
Hadoop-10456).
+    if (shouldCloneJobConf) {
+      // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+      // one job modifies a configuration while another reads it (SPARK-2546). 
 This problem occurs
+      // somewhat rarely because most jobs treat the configuration as though 
it's immutable.  One
+      // solution, implemented here, is to clone the Configuration object.  
Unfortunately, this
+      // clone can be very expensive.  To avoid unexpected performance 
regressions for workloads and
+      // Hadoop versions that do not suffer from these thread-safety issues, 
this cloning is
+      // disabled by default.
       HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+        logDebug("Cloning Hadoop Configuration")
         val newJobConf = new JobConf(conf)
-        initLocalJobConfFuncOpt.map(f => f(newJobConf))
-        HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+        if (!conf.isInstanceOf[JobConf]) {
+          initLocalJobConfFuncOpt.map(f => f(newJobConf))
+        }
         newJobConf
       }
+    } else {
+      if (conf.isInstanceOf[JobConf]) {
+        logDebug("Re-using user-broadcasted JobConf")
+        conf.asInstanceOf[JobConf]
+      } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+        logDebug("Re-using cached JobConf")
+        HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+      } else {
+        // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
+        // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
+        // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+        // Synchronize to prevent ConcurrentModificationException (SPARK-1097, 
HADOOP-10456).
+        HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+          logDebug("Creating new JobConf and caching it for later re-use")
+          val newJobConf = new JobConf(conf)
+          initLocalJobConfFuncOpt.map(f => f(newJobConf))
+          HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+          newJobConf
+        }
+      }
     }
   }
 
@@ -257,7 +277,10 @@ class HadoopRDD[K, V](
 }
 
 private[spark] object HadoopRDD {
-  /** Constructing Configuration objects is not threadsafe, use this lock to 
serialize. */
+  /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and 
HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new JobConf() or 
new Configuration().
+   */
   val CONFIGURATION_INSTANTIATION_LOCK = new Object()
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd40db2/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a91967a..335650d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -583,6 +583,15 @@ Apart from these, the following properties are also 
available, and may be useful
     previous versions of Spark. Simply use Hadoop's FileSystem API to delete 
output directories by hand.</td>
 </tr>
 <tr>
+    <td><code>spark.hadoop.cloneConf</code></td>
+    <td>false</td>
+    <td>If set to true, clones a new Hadoop <code>Configuration</code> object 
for each task.  This
+    option should be enabled to work around <code>Configuration</code> 
thread-safety issues (see
+    <a href="https://issues.apache.org/jira/browse/SPARK-2546";>SPARK-2546</a> 
for more details).
+    This is disabled by default in order to avoid unexpected performance 
regressions for jobs that
+    are not affected by these issues.</td>
+</tr>
+<tr>
     <td><code>spark.executor.heartbeatInterval</code></td>
     <td>10000</td>
     <td>Interval (milliseconds) between each executor's heartbeats to the 
driver.  Heartbeats let


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to