spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD

2015-09-18 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 348d7c9a9 -> 8074208fa


[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for 
NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim 

Closes #8763 from mingyukim/mkim/SPARK-10611.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8074208f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8074208f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8074208f

Branch: refs/heads/master
Commit: 8074208fa47fa654c1055c48cfa0d923edeeb04f
Parents: 348d7c9
Author: Mingyu Kim 
Authored: Fri Sep 18 15:40:58 2015 -0700
Committer: Josh Rosen 
Committed: Fri Sep 18 15:40:58 2015 -0700

--
 .../org/apache/spark/rdd/BinaryFileRDD.scala|  5 +--
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 37 
 2 files changed, 34 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6fec00d..aedced7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
 
   override def getPartitions: Array[Partition] = {
 val inputFormat = inputFormatClass.newInstance
+val conf = getConf
 inputFormat match {
   case configurable: Configurable =>
-configurable.setConf(getConf)
+configurable.setConf(conf)
   case _ =>
 }
-val jobContext = newJobContext(getConf, jobId)
+val jobContext = newJobContext(conf, jobId)
 inputFormat.setMinPartitions(jobContext, minPartitions)
 val rawSplits = inputFormat.getSplits(jobContext).toArray
 val result = new Array[Partition](rawSplits.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 174979a..2872b93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
   override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
+  private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+
+  def getConf: Configuration = {
+val conf: Configuration = confBroadcast.value.value
+if (shouldCloneJobConf) {
+  // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+  // one job modifies a configuration while another reads it (SPARK-2546, 
SPARK-10611).  This
+  // problem occurs somewhat rarely because most jobs treat the 
configuration as though it's
+  // immutable.  One solution, implemented here, is to clone the 
Configuration object.
+  // Unfortunately, this clone can be very expensive.  To avoid unexpected 
performance
+  // regressions for workloads and Hadoop versions that do not suffer from 
these thread-safety
+  // issues, this cloning is disabled by default.
+  NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+logDebug("Cloning Hadoop Configuration")
+new Configuration(conf)
+  }
+} else {
+  conf
+}
+  }
+
   override def getPartitions: Array[Partition] = {
 val inputFormat = inputFormatClass.newInstance
 inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
 val iter = new Iterator[(K, V)] {
   val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo("Input split: " + split.serializableHadoopSplit)
-  val conf = confBroadcast.value.value
+  val conf = getConf
 
   val inputMetrics = context.taskMetrics
 .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
 super.persist(storageLevel)
   }
 
-
-  def getConf: Configuration = confBroadcast.value.value
 }
 
 private[spark] object NewHadoopRDD {
   /**
+   * Configuration's constructor 

spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD

2015-09-18 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4051fffaa -> a6c315358


[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for 
NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim 

Closes #8763 from mingyukim/mkim/SPARK-10611.

(cherry picked from commit 8074208fa47fa654c1055c48cfa0d923edeeb04f)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6c31535
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6c31535
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6c31535

Branch: refs/heads/branch-1.5
Commit: a6c315358b4517c461beabd5cd319d56d9fddd57
Parents: 4051fff
Author: Mingyu Kim 
Authored: Fri Sep 18 15:40:58 2015 -0700
Committer: Josh Rosen 
Committed: Fri Sep 18 16:24:40 2015 -0700

--
 .../org/apache/spark/rdd/BinaryFileRDD.scala|  1 +
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 33 +---
 2 files changed, 30 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db..a9e78e8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,6 +34,7 @@ private[spark] class BinaryFileRDD[T](
 
   override def getPartitions: Array[Partition] = {
 val inputFormat = inputFormatClass.newInstance
+val conf = getConf
 inputFormat match {
   case configurable: Configurable =>
 configurable.setConf(conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004..796151e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
   extends Partition {
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
   override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
+  private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+
+  def getConf: Configuration = {
+val conf: Configuration = confBroadcast.value.value
+if (shouldCloneJobConf) {
+  // Hadoop Configuration objects are not thread-safe, which may lead to 
various problems if
+  // one job modifies a configuration while another reads it (SPARK-2546, 
SPARK-10611).  This
+  // problem occurs somewhat rarely because most jobs treat the 
configuration as though it's
+  // immutable.  One solution, implemented here, is to clone the 
Configuration object.
+  // Unfortunately, this clone can be very expensive.  To avoid unexpected 
performance
+  // regressions for workloads and Hadoop versions that do not suffer from 
these thread-safety
+  // issues, this cloning is disabled by default.
+  NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+logDebug("Cloning Hadoop Configuration")
+new Configuration(conf)
+  }
+} else {
+  conf
+}
+  }
+
   override def getPartitions: Array[Partition] = {
 val inputFormat = inputFormatClass.newInstance
 inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
 val iter = new Iterator[(K, V)] {
   val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo("Input split: " + split.serializableHadoopSplit)
-  val conf = confBroadcast.value.value
+  val conf = getConf
 
   val inputMetrics = context.taskMetrics
 .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
 super.persist(storageLevel)
   }
 
-
-  def getConf: Configuration = confBroadcast.value.value
 }
 
 private[spark] object NewHadoopRDD {
   /**
+   * Configuration's constructor is not threadsafe (see SPARK-1097 and 
HADOOP-10456).
+   * Therefore, we synchronize on this lock before calling new Configuration().
+   */
+  val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
+  /**
*