Repository: spark
Updated Branches:
  refs/heads/master 23405f324 -> 5b77e66dd


[SPARK-17387][PYSPARK] Creating SparkContext() from python without spark-submit 
ignores user conf

## What changes were proposed in this pull request?

The root cause that we would ignore SparkConf when launching JVM is that 
SparkConf require JVM to be created first.  
https://github.com/apache/spark/blob/master/python/pyspark/conf.py#L106
In this PR, I would defer the launching of JVM until SparkContext is created so 
that we can pass SparkConf to JVM correctly.

## How was this patch tested?

Use the example code in the description of SPARK-17387,
```
$ SPARK_HOME=$PWD PYTHONPATH=python:python/lib/py4j-0.10.3-src.zip python
Python 2.7.12 (default, Jul  1 2016, 15:12:24)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from pyspark import SparkContext
>>> from pyspark import SparkConf
>>> conf = SparkConf().set("spark.driver.memory", "4g")
>>> sc = SparkContext(conf=conf)
```
And verify the spark.driver.memory is correctly picked up.

```
...op/ -Xmx4g org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=4g 
pyspark-shell
```

Author: Jeff Zhang <zjf...@apache.org>

Closes #14959 from zjffdu/SPARK-17387.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b77e66d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b77e66d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b77e66d

Branch: refs/heads/master
Commit: 5b77e66dd6a128c5992ab3bde418613f84be7009
Parents: 23405f3
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Oct 11 14:56:26 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Oct 11 14:56:26 2016 -0700

----------------------------------------------------------------------
 python/pyspark/conf.py         | 71 +++++++++++++++++++++++++++----------
 python/pyspark/context.py      | 16 ++++++---
 python/pyspark/java_gateway.py | 13 +++++--
 3 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b77e66d/python/pyspark/conf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index 924da3e..64b6f23 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -52,6 +52,14 @@ spark.home=/path
 >>> sorted(conf.getAll(), key=lambda p: p[0])
 [(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', 
u'value3'), \
 (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
+>>> conf._jconf.setExecutorEnv("VAR5", "value5")
+JavaObject id...
+>>> print(conf.toDebugString())
+spark.executorEnv.VAR1=value1
+spark.executorEnv.VAR3=value3
+spark.executorEnv.VAR4=value4
+spark.executorEnv.VAR5=value5
+spark.home=/path
 """
 
 __all__ = ['SparkConf']
@@ -101,13 +109,24 @@ class SparkConf(object):
             self._jconf = _jconf
         else:
             from pyspark.context import SparkContext
-            SparkContext._ensure_initialized()
             _jvm = _jvm or SparkContext._jvm
-            self._jconf = _jvm.SparkConf(loadDefaults)
+
+            if _jvm is not None:
+                # JVM is created, so create self._jconf directly through JVM
+                self._jconf = _jvm.SparkConf(loadDefaults)
+                self._conf = None
+            else:
+                # JVM is not created, so store data in self._conf first
+                self._jconf = None
+                self._conf = {}
 
     def set(self, key, value):
         """Set a configuration property."""
-        self._jconf.set(key, unicode(value))
+        # Try to set self._jconf first if JVM is created, set self._conf if 
JVM is not created yet.
+        if self._jconf is not None:
+            self._jconf.set(key, unicode(value))
+        else:
+            self._conf[key] = unicode(value)
         return self
 
     def setIfMissing(self, key, value):
@@ -118,17 +137,17 @@ class SparkConf(object):
 
     def setMaster(self, value):
         """Set master URL to connect to."""
-        self._jconf.setMaster(value)
+        self.set("spark.master", value)
         return self
 
     def setAppName(self, value):
         """Set application name."""
-        self._jconf.setAppName(value)
+        self.set("spark.app.name", value)
         return self
 
     def setSparkHome(self, value):
         """Set path where Spark is installed on worker nodes."""
-        self._jconf.setSparkHome(value)
+        self.set("spark.home", value)
         return self
 
     def setExecutorEnv(self, key=None, value=None, pairs=None):
@@ -136,10 +155,10 @@ class SparkConf(object):
         if (key is not None and pairs is not None) or (key is None and pairs 
is None):
             raise Exception("Either pass one key-value pair or a list of 
pairs")
         elif key is not None:
-            self._jconf.setExecutorEnv(key, value)
+            self.set("spark.executorEnv." + key, value)
         elif pairs is not None:
             for (k, v) in pairs:
-                self._jconf.setExecutorEnv(k, v)
+                self.set("spark.executorEnv." + k, v)
         return self
 
     def setAll(self, pairs):
@@ -149,35 +168,49 @@ class SparkConf(object):
         :param pairs: list of key-value pairs to set
         """
         for (k, v) in pairs:
-            self._jconf.set(k, v)
+            self.set(k, v)
         return self
 
     def get(self, key, defaultValue=None):
         """Get the configured value for some key, or return a default 
otherwise."""
         if defaultValue is None:   # Py4J doesn't call the right get() if we 
pass None
-            if not self._jconf.contains(key):
-                return None
-            return self._jconf.get(key)
+            if self._jconf is not None:
+                if not self._jconf.contains(key):
+                    return None
+                return self._jconf.get(key)
+            else:
+                if key not in self._conf:
+                    return None
+                return self._conf[key]
         else:
-            return self._jconf.get(key, defaultValue)
+            if self._jconf is not None:
+                return self._jconf.get(key, defaultValue)
+            else:
+                return self._conf.get(key, defaultValue)
 
     def getAll(self):
         """Get all values as a list of key-value pairs."""
-        pairs = []
-        for elem in self._jconf.getAll():
-            pairs.append((elem._1(), elem._2()))
-        return pairs
+        if self._jconf is not None:
+            return [(elem._1(), elem._2()) for elem in self._jconf.getAll()]
+        else:
+            return self._conf.items()
 
     def contains(self, key):
         """Does this configuration contain a given key?"""
-        return self._jconf.contains(key)
+        if self._jconf is not None:
+            return self._jconf.contains(key)
+        else:
+            return key in self._conf
 
     def toDebugString(self):
         """
         Returns a printable version of the configuration, as a list of
         key=value pairs, one per line.
         """
-        return self._jconf.toDebugString()
+        if self._jconf is not None:
+            return self._jconf.toDebugString()
+        else:
+            return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items())
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/5b77e66d/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a3dd195..1b2e199 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -109,7 +109,7 @@ class SparkContext(object):
         ValueError:...
         """
         self._callsite = first_spark_call() or CallSite(None, None, None)
-        SparkContext._ensure_initialized(self, gateway=gateway)
+        SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
         try:
             self._do_init(master, appName, sparkHome, pyFiles, environment, 
batchSize, serializer,
                           conf, jsc, profiler_cls)
@@ -121,7 +121,15 @@ class SparkContext(object):
     def _do_init(self, master, appName, sparkHome, pyFiles, environment, 
batchSize, serializer,
                  conf, jsc, profiler_cls):
         self.environment = environment or {}
-        self._conf = conf or SparkConf(_jvm=self._jvm)
+        # java gateway must have been launched at this point.
+        if conf is not None and conf._jconf is not None:
+            # conf has been initialized in JVM properly, so use conf directly. 
This represent the
+            # scenario that JVM has been launched before SparkConf is created 
(e.g. SparkContext is
+            # created and then stopped, and we create a new SparkConf and new 
SparkContext again)
+            self._conf = conf
+        else:
+            self._conf = SparkConf(_jvm=SparkContext._jvm)
+
         self._batchSize = batchSize  # -1 represents an unlimited batch size
         self._unbatched_serializer = serializer
         if batchSize == 0:
@@ -232,14 +240,14 @@ class SparkContext(object):
         return self._jvm.JavaSparkContext(jconf)
 
     @classmethod
-    def _ensure_initialized(cls, instance=None, gateway=None):
+    def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
         """
         Checks whether a SparkContext is initialized or not.
         Throws error if a SparkContext is already running.
         """
         with SparkContext._lock:
             if not SparkContext._gateway:
-                SparkContext._gateway = gateway or launch_gateway()
+                SparkContext._gateway = gateway or launch_gateway(conf)
                 SparkContext._jvm = SparkContext._gateway.jvm
 
             if instance:

http://git-wip-us.apache.org/repos/asf/spark/blob/5b77e66d/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index f76cadc..c1cf843 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -32,7 +32,12 @@ from py4j.java_gateway import java_import, JavaGateway, 
GatewayClient
 from pyspark.serializers import read_int
 
 
-def launch_gateway():
+def launch_gateway(conf=None):
+    """
+    launch jvm gateway
+    :param conf: spark configuration passed to spark-submit
+    :return:
+    """
     if "PYSPARK_GATEWAY_PORT" in os.environ:
         gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
     else:
@@ -41,13 +46,17 @@ def launch_gateway():
         # proper classpath and settings from spark-env.sh
         on_windows = platform.system() == "Windows"
         script = "./bin/spark-submit.cmd" if on_windows else 
"./bin/spark-submit"
+        command = [os.path.join(SPARK_HOME, script)]
+        if conf:
+            for k, v in conf.getAll():
+                command += ['--conf', '%s=%s' % (k, v)]
         submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
         if os.environ.get("SPARK_TESTING"):
             submit_args = ' '.join([
                 "--conf spark.ui.enabled=false",
                 submit_args
             ])
-        command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
+        command = command + shlex.split(submit_args)
 
         # Start a socket that will be used by PythonGatewayServer to 
communicate its port to us
         callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to