Repository: spark Updated Branches: refs/heads/branch-1.1 e19c70f95 -> 40bce6350
SPARK-5425: Use synchronised methods in system properties to create SparkConf SPARK-5425: Fixed usages of system properties This patch fixes few problems caused by the fact that the Scala wrapper over system properties is not thread-safe and is basically invalid because it doesn't take into account the default values which could have been set in the properties object. The problem is fixed by modifying `Utils.getSystemProperties` method so that it uses `stringPropertyNames` method of the `Properties` class, which is thread-safe (internally it creates a defensive copy in a synchronized method) and returns keys of the properties which were set explicitly and which are defined as defaults. The other related problem, which is fixed here. was in `ResetSystemProperties` mix-in. It created a copy of the system properties in the wrong way. This patch also introduces a test case for thread-safeness of SparkConf creation. Refer to the discussion in https://github.com/apache/spark/pull/4220 for more details. Author: Jacek Lewandowski <[email protected]> Closes #4220 from jacek-lewandowski/SPARK-5425-1.1 and squashes the following commits: 6c48a1f [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults 74b4489 [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait 685780e [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system properties to create SparkConf Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40bce635 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40bce635 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40bce635 Branch: refs/heads/branch-1.1 Commit: 40bce6350a9af9862e0ffa29913b82bc2226aaa4 Parents: e19c70f Author: Jacek Lewandowski <[email protected]> Authored: Mon Feb 2 14:06:23 2015 -0800 Committer: Josh Rosen <[email protected]> Committed: Mon Feb 2 14:06:23 2015 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 7 +++--- .../scala/org/apache/spark/util/Utils.scala | 11 ++++++--- .../scala/org/apache/spark/SparkConfSuite.scala | 25 ++++++++++++++++++++ .../spark/util/ResetSystemProperties.scala | 7 +++++- .../spark/examples/DriverSubmissionTest.scala | 4 +++- 5 files changed, 46 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e..068a034 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,9 +17,10 @@ package org.apache.spark -import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.spark.util.Utils + /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * @@ -49,8 +50,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { if (loadDefaults) { // Load any spark.* system properties - for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { - settings(k) = v + for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { + settings(key) = value } } http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4259145..90402ca 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1227,9 +1227,14 @@ private[spark] object Utils extends Logging { hashAbs } - /** Returns a copy of the system properties that is thread-safe to iterator over. */ - def getSystemProperties(): Map[String, String] = { - System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] + /** Returns the system properties map that is thread-safe to iterator over. It gets the + * properties which have been set explicitly, as well as those for which only a default value + * has been defined. */ + def getSystemProperties: Map[String, String] = { + val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield + (key, System.getProperty(key)) + + sysProps.toMap } /** http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index ae8a3c7..cc56d9c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark +import java.util.concurrent.{TimeUnit, Executors} + +import scala.util.{Try, Random} + import org.scalatest.FunSuite import org.apache.spark.util.ResetSystemProperties @@ -121,4 +125,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(conf.get("spark.test.a.b") === "A.B") assert(conf.get("spark.test.a.b.c") === "a.b.c") } + + test("Thread safeness - SPARK-5425") { + import scala.collection.JavaConversions._ + val executor = Executors.newSingleThreadScheduledExecutor() + val sf = executor.scheduleAtFixedRate(new Runnable { + override def run(): Unit = + System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString) + }, 0, 1, TimeUnit.MILLISECONDS) + + try { + val t0 = System.currentTimeMillis() + while ((System.currentTimeMillis() - t0) < 1000) { + val conf = Try(new SparkConf(loadDefaults = true)) + assert(conf.isSuccess === true) + } + } finally { + executor.shutdownNow() + for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425.")) + System.getProperties.remove(key) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala index d4b92f3..bad1aa9 100644 --- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala +++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.util.Properties +import org.apache.commons.lang3.SerializationUtils import org.scalatest.{BeforeAndAfterEach, Suite} /** @@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su var oldProperties: Properties = null override def beforeEach(): Unit = { - oldProperties = new Properties(System.getProperties) + // we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because + // the later way of creating a copy does not copy the properties but it initializes a new + // Properties object with the given properties as defaults. They are not recognized at all + // by standard Scala wrapper over Java Properties then. + oldProperties = SerializationUtils.clone(System.getProperties) super.beforeEach() } http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 65251e9..e757283 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.examples import scala.collection.JavaConversions._ +import org.apache.spark.util.Utils + /** Prints out environmental information, sleeps, and then exits. Made to * test driver submission in the standalone scheduler. */ object DriverSubmissionTest { @@ -30,7 +32,7 @@ object DriverSubmissionTest { val numSecondsToSleep = args(0).toInt val env = System.getenv() - val properties = System.getProperties() + val properties = Utils.getSystemProperties println("Environment variables containing SPARK_TEST:") env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
