Repository: spark
Updated Branches:
  refs/heads/master bff65b5cc -> 5a5526164


SPARK-5425: Use synchronised methods in system properties to create SparkConf

SPARK-5425: Fixed usages of system properties

This patch fixes few problems caused by the fact that the Scala wrapper over 
system properties is not thread-safe and is basically invalid because it 
doesn't take into account the default values which could have been set in the 
properties object. The problem is fixed by modifying 
`Utils.getSystemProperties` method so that it uses `stringPropertyNames` method 
of the `Properties` class, which is thread-safe (internally it creates a 
defensive copy in a synchronized method) and returns keys of the properties 
which were set explicitly and which are defined as defaults.
The other related problem, which is fixed here. was in `ResetSystemProperties` 
mix-in. It created a copy of the system properties in the wrong way.

This patch also introduces a test case for thread-safeness of SparkConf 
creation.

Refer to the discussion in https://github.com/apache/spark/pull/4220 for more 
details.

Author: Jacek Lewandowski <[email protected]>

Closes #4222 from jacek-lewandowski/SPARK-5425-1.3 and squashes the following 
commits:

03da61b [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to 
return a map of all system properties - explicit + defaults
8faf2ea [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save 
properties in ResetSystemProperties trait
71aa572 [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system 
properties to create SparkConf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a552616
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a552616
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a552616

Branch: refs/heads/master
Commit: 5a5526164bdf9ecf1306d4570e816eb4df5cfd2b
Parents: bff65b5
Author: Jacek Lewandowski <[email protected]>
Authored: Mon Feb 2 14:07:19 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Mon Feb 2 14:07:19 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  5 ++--
 .../scala/org/apache/spark/util/Utils.scala     | 11 ++++++---
 .../scala/org/apache/spark/SparkConfSuite.scala | 25 ++++++++++++++++++++
 .../spark/util/ResetSystemProperties.scala      |  7 +++++-
 .../spark/examples/DriverSubmissionTest.scala   |  4 +++-
 5 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cd91c8f..4d4c69d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.LinkedHashSet
 
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.Utils
 
 /**
  * Configuration for a Spark application. Used to set various Spark parameters 
as key-value pairs.
@@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with 
Logging {
 
   if (loadDefaults) {
     // Load any spark.* system properties
-    for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
-      set(k, v)
+    for ((key, value) <- Utils.getSystemProperties if 
key.startsWith("spark.")) {
+      set(key, value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 703b23a..31850b5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1347,9 +1347,14 @@ private[spark] object Utils extends Logging {
     hashAbs
   }
 
-  /** Returns a copy of the system properties that is thread-safe to iterator 
over. */
-  def getSystemProperties(): Map[String, String] = {
-    
System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, 
String]
+  /** Returns the system properties map that is thread-safe to iterator over. 
It gets the
+    * properties which have been set explicitly, as well as those for which 
only a default value
+    * has been defined. */
+  def getSystemProperties: Map[String, String] = {
+    val sysProps = for (key <- System.getProperties.stringPropertyNames()) 
yield
+      (key, System.getProperty(key))
+
+    sysProps.toMap
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 790976a..e08210a 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark
 
+import java.util.concurrent.{TimeUnit, Executors}
+
+import scala.util.{Try, Random}
+
 import org.scalatest.FunSuite
 import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
 import org.apache.spark.util.ResetSystemProperties
@@ -123,6 +127,27 @@ class SparkConfSuite extends FunSuite with 
LocalSparkContext with ResetSystemPro
     assert(conf.get("spark.test.a.b.c") === "a.b.c")
   }
 
+  test("Thread safeness - SPARK-5425") {
+    import scala.collection.JavaConversions._
+    val executor = Executors.newSingleThreadScheduledExecutor()
+    val sf = executor.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit =
+        System.setProperty("spark.5425." + Random.nextInt(), 
Random.nextInt().toString)
+    }, 0, 1, TimeUnit.MILLISECONDS)
+
+    try {
+      val t0 = System.currentTimeMillis()
+      while ((System.currentTimeMillis() - t0) < 1000) {
+        val conf = Try(new SparkConf(loadDefaults = true))
+        assert(conf.isSuccess === true)
+      }
+    } finally {
+      executor.shutdownNow()
+      for (key <- System.getProperties.stringPropertyNames() if 
key.startsWith("spark.5425."))
+        System.getProperties.remove(key)
+    }
+  }
+
   test("register kryo classes through registerKryoClasses") {
     val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala 
b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
index d4b92f3..bad1aa9 100644
--- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import java.util.Properties
 
+import org.apache.commons.lang3.SerializationUtils
 import org.scalatest.{BeforeAndAfterEach, Suite}
 
 /**
@@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends 
BeforeAndAfterEach { this: Su
   var oldProperties: Properties = null
 
   override def beforeEach(): Unit = {
-    oldProperties = new Properties(System.getProperties)
+    // we need SerializationUtils.clone instead of `new 
Properties(System.getProperties()` because
+    // the later way of creating a copy does not copy the properties but it 
initializes a new
+    // Properties object with the given properties as defaults. They are not 
recognized at all
+    // by standard Scala wrapper over Java Properties then.
+    oldProperties = SerializationUtils.clone(System.getProperties)
     super.beforeEach()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index 65251e9..e757283 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.examples
 
 import scala.collection.JavaConversions._
 
+import org.apache.spark.util.Utils
+
 /** Prints out environmental information, sleeps, and then exits. Made to
   * test driver submission in the standalone scheduler. */
 object DriverSubmissionTest {
@@ -30,7 +32,7 @@ object DriverSubmissionTest {
     val numSecondsToSleep = args(0).toInt
 
     val env = System.getenv()
-    val properties = System.getProperties()
+    val properties = Utils.getSystemProperties
 
     println("Environment variables containing SPARK_TEST:")
     env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to