spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD
Repository: spark Updated Branches: refs/heads/master 348d7c9a9 -> 8074208fa [SPARK-10611] Clone Configuration for each task for NewHadoopRDD This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu KimCloses #8763 from mingyukim/mkim/SPARK-10611. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8074208f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8074208f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8074208f Branch: refs/heads/master Commit: 8074208fa47fa654c1055c48cfa0d923edeeb04f Parents: 348d7c9 Author: Mingyu Kim Authored: Fri Sep 18 15:40:58 2015 -0700 Committer: Josh Rosen Committed: Fri Sep 18 15:40:58 2015 -0700 -- .../org/apache/spark/rdd/BinaryFileRDD.scala| 5 +-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 37 2 files changed, 34 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 6fec00d..aedced7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance +val conf = getConf inputFormat match { case configurable: Configurable => -configurable.setConf(getConf) +configurable.setConf(conf) case _ => } -val jobContext = newJobContext(getConf, jobId) +val jobContext = newJobContext(conf, jobId) inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 174979a..2872b93 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition( extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = 41 * (41 + rddId) + index } @@ -84,6 +83,27 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) + private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + + def getConf: Configuration = { +val conf: Configuration = confBroadcast.value.value +if (shouldCloneJobConf) { + // Hadoop Configuration objects are not thread-safe, which may lead to various problems if + // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This + // problem occurs somewhat rarely because most jobs treat the configuration as though it's + // immutable. One solution, implemented here, is to clone the Configuration object. + // Unfortunately, this clone can be very expensive. To avoid unexpected performance + // regressions for workloads and Hadoop versions that do not suffer from these thread-safety + // issues, this cloning is disabled by default. + NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { +logDebug("Cloning Hadoop Configuration") +new Configuration(conf) + } +} else { + conf +} + } + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance inputFormat match { @@ -104,7 +124,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = getConf val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) @@ -230,12 +250,16 @@ class NewHadoopRDD[K, V]( super.persist(storageLevel) } - - def getConf: Configuration = confBroadcast.value.value } private[spark] object NewHadoopRDD { /** + * Configuration's constructor
spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD
Repository: spark Updated Branches: refs/heads/branch-1.5 4051fffaa -> a6c315358 [SPARK-10611] Clone Configuration for each task for NewHadoopRDD This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu KimCloses #8763 from mingyukim/mkim/SPARK-10611. (cherry picked from commit 8074208fa47fa654c1055c48cfa0d923edeeb04f) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6c31535 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6c31535 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6c31535 Branch: refs/heads/branch-1.5 Commit: a6c315358b4517c461beabd5cd319d56d9fddd57 Parents: 4051fff Author: Mingyu Kim Authored: Fri Sep 18 15:40:58 2015 -0700 Committer: Josh Rosen Committed: Fri Sep 18 16:24:40 2015 -0700 -- .../org/apache/spark/rdd/BinaryFileRDD.scala| 1 + .../org/apache/spark/rdd/NewHadoopRDD.scala | 33 +--- 2 files changed, 30 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 1f755db..a9e78e8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -34,6 +34,7 @@ private[spark] class BinaryFileRDD[T]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance +val conf = getConf inputFormat match { case configurable: Configurable => configurable.setConf(conf) http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6a9c004..796151e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition( extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = 41 * (41 + rddId) + index } @@ -84,6 +83,27 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) + private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + + def getConf: Configuration = { +val conf: Configuration = confBroadcast.value.value +if (shouldCloneJobConf) { + // Hadoop Configuration objects are not thread-safe, which may lead to various problems if + // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This + // problem occurs somewhat rarely because most jobs treat the configuration as though it's + // immutable. One solution, implemented here, is to clone the Configuration object. + // Unfortunately, this clone can be very expensive. To avoid unexpected performance + // regressions for workloads and Hadoop versions that do not suffer from these thread-safety + // issues, this cloning is disabled by default. + NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { +logDebug("Cloning Hadoop Configuration") +new Configuration(conf) + } +} else { + conf +} + } + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance inputFormat match { @@ -104,7 +124,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = getConf val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) @@ -230,12 +250,16 @@ class NewHadoopRDD[K, V]( super.persist(storageLevel) } - - def getConf: Configuration = confBroadcast.value.value } private[spark] object NewHadoopRDD { /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new Configuration(). + */ + val CONFIGURATION_INSTANTIATION_LOCK = new Object() + + /** *