Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4051fffaa -> a6c315358


[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for 
NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim <m...@palantir.com>

Closes #8763 from mingyukim/mkim/SPARK-10611.

(cherry picked from commit 8074208fa47fa654c1055c48cfa0d923edeeb04f)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6c31535
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6c31535
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6c31535

Branch: refs/heads/branch-1.5
Commit: a6c315358b4517c461beabd5cd319d56d9fddd57
Parents: 4051fff
Author: Mingyu Kim <m...@palantir.com>
Authored: Fri Sep 18 15:40:58 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Sep 18 16:24:40 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  1 +
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 33 +++++++++++++++++---
 2 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db..a9e78e8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,6 +34,7 @@ private[spark] class BinaryFileRDD[T](
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
         configurable.setConf(conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004..796151e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
   override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
+  private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+
+  def getConf: Configuration = {
+    val conf: Configuration = confBroadcast.value.value
+    if (shouldCloneJobConf) {
+      // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+      // one job modifies a configuration while another reads it (SPARK-2546, 
SPARK-10611).  This
+      // problem occurs somewhat rarely because most jobs treat the 
configuration as though it's
+      // immutable.  One solution, implemented here, is to clone the 
Configuration object.
+      // Unfortunately, this clone can be very expensive.  To avoid unexpected 
performance
+      // regressions for workloads and Hadoop versions that do not suffer from 
these thread-safety
+      // issues, this cloning is disabled by default.
+      NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+        logDebug("Cloning Hadoop Configuration")
+        new Configuration(conf)
+      }
+    } else {
+      conf
+    }
+  }
+
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
     inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
     val iter = new Iterator[(K, V)] {
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = confBroadcast.value.value
+      val conf = getConf
 
       val inputMetrics = context.taskMetrics
         .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
     super.persist(storageLevel)
   }
 
-
-  def getConf: Configuration = confBroadcast.value.value
 }
 
 private[spark] object NewHadoopRDD {
   /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and 
HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new Configuration().
+   */
+  val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
+  /**
    * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an 
InputSplit to
    * the given function rather than the index of the partition.
    */
@@ -268,6 +292,7 @@ private[spark] class WholeTextFileRDD(
 
   override def getPartitions: Array[Partition] = {
     val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
     inputFormat match {
       case configurable: Configurable =>
         configurable.setConf(conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to