Repository: spark
Updated Branches:
  refs/heads/branch-1.1 e19c70f95 -> 40bce6350


SPARK-5425: Use synchronised methods in system properties to create SparkConf

SPARK-5425: Fixed usages of system properties

This patch fixes few problems caused by the fact that the Scala wrapper over 
system properties is not thread-safe and is basically invalid because it 
doesn't take into account the default values which could have been set in the 
properties object. The problem is fixed by modifying 
`Utils.getSystemProperties` method so that it uses `stringPropertyNames` method 
of the `Properties` class, which is thread-safe (internally it creates a 
defensive copy in a synchronized method) and returns keys of the properties 
which were set explicitly and which are defined as defaults.
The other related problem, which is fixed here. was in `ResetSystemProperties` 
mix-in. It created a copy of the system properties in the wrong way.

This patch also introduces a test case for thread-safeness of SparkConf 
creation.

Refer to the discussion in https://github.com/apache/spark/pull/4220 for more 
details.

Author: Jacek Lewandowski <[email protected]>

Closes #4220 from jacek-lewandowski/SPARK-5425-1.1 and squashes the following 
commits:

6c48a1f [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to 
return a map of all system properties - explicit + defaults
74b4489 [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save 
properties in ResetSystemProperties trait
685780e [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system 
properties to create SparkConf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40bce635
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40bce635
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40bce635

Branch: refs/heads/branch-1.1
Commit: 40bce6350a9af9862e0ffa29913b82bc2226aaa4
Parents: e19c70f
Author: Jacek Lewandowski <[email protected]>
Authored: Mon Feb 2 14:06:23 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Mon Feb 2 14:06:23 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  7 +++---
 .../scala/org/apache/spark/util/Utils.scala     | 11 ++++++---
 .../scala/org/apache/spark/SparkConfSuite.scala | 25 ++++++++++++++++++++
 .../spark/util/ResetSystemProperties.scala      |  7 +++++-
 .../spark/examples/DriverSubmissionTest.scala   |  4 +++-
 5 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 605df0e..068a034 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
+import org.apache.spark.util.Utils
+
 /**
  * Configuration for a Spark application. Used to set various Spark parameters 
as key-value pairs.
  *
@@ -49,8 +50,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with 
Logging {
 
   if (loadDefaults) {
     // Load any spark.* system properties
-    for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
-      settings(k) = v
+    for ((key, value) <- Utils.getSystemProperties if 
key.startsWith("spark.")) {
+      settings(key) = value
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4259145..90402ca 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1227,9 +1227,14 @@ private[spark] object Utils extends Logging {
     hashAbs
   }
 
-  /** Returns a copy of the system properties that is thread-safe to iterator 
over. */
-  def getSystemProperties(): Map[String, String] = {
-    
System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, 
String]
+  /** Returns the system properties map that is thread-safe to iterator over. 
It gets the
+    * properties which have been set explicitly, as well as those for which 
only a default value
+    * has been defined. */
+  def getSystemProperties: Map[String, String] = {
+    val sysProps = for (key <- System.getProperties.stringPropertyNames()) 
yield
+      (key, System.getProperty(key))
+
+    sysProps.toMap
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index ae8a3c7..cc56d9c 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark
 
+import java.util.concurrent.{TimeUnit, Executors}
+
+import scala.util.{Try, Random}
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.util.ResetSystemProperties
@@ -121,4 +125,25 @@ class SparkConfSuite extends FunSuite with 
LocalSparkContext with ResetSystemPro
     assert(conf.get("spark.test.a.b") === "A.B")
     assert(conf.get("spark.test.a.b.c") === "a.b.c")
   }
+
+  test("Thread safeness - SPARK-5425") {
+    import scala.collection.JavaConversions._
+    val executor = Executors.newSingleThreadScheduledExecutor()
+    val sf = executor.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit =
+        System.setProperty("spark.5425." + Random.nextInt(), 
Random.nextInt().toString)
+    }, 0, 1, TimeUnit.MILLISECONDS)
+
+    try {
+      val t0 = System.currentTimeMillis()
+      while ((System.currentTimeMillis() - t0) < 1000) {
+        val conf = Try(new SparkConf(loadDefaults = true))
+        assert(conf.isSuccess === true)
+      }
+    } finally {
+      executor.shutdownNow()
+      for (key <- System.getProperties.stringPropertyNames() if 
key.startsWith("spark.5425."))
+        System.getProperties.remove(key)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala 
b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
index d4b92f3..bad1aa9 100644
--- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import java.util.Properties
 
+import org.apache.commons.lang3.SerializationUtils
 import org.scalatest.{BeforeAndAfterEach, Suite}
 
 /**
@@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends 
BeforeAndAfterEach { this: Su
   var oldProperties: Properties = null
 
   override def beforeEach(): Unit = {
-    oldProperties = new Properties(System.getProperties)
+    // we need SerializationUtils.clone instead of `new 
Properties(System.getProperties()` because
+    // the later way of creating a copy does not copy the properties but it 
initializes a new
+    // Properties object with the given properties as defaults. They are not 
recognized at all
+    // by standard Scala wrapper over Java Properties then.
+    oldProperties = SerializationUtils.clone(System.getProperties)
     super.beforeEach()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40bce635/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index 65251e9..e757283 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.examples
 
 import scala.collection.JavaConversions._
 
+import org.apache.spark.util.Utils
+
 /** Prints out environmental information, sleeps, and then exits. Made to
   * test driver submission in the standalone scheduler. */
 object DriverSubmissionTest {
@@ -30,7 +32,7 @@ object DriverSubmissionTest {
     val numSecondsToSleep = args(0).toInt
 
     val env = System.getenv()
-    val properties = System.getProperties()
+    val properties = Utils.getSystemProperties
 
     println("Environment variables containing SPARK_TEST:")
     env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to