Repository: spark
Updated Branches:
  refs/heads/master 0375134f4 -> 4740d6a15


[SPARK-6216] [PySpark] check the python version in worker

Author: Davies Liu <dav...@databricks.com>

Closes #5404 from davies/check_version and squashes the following commits:

e559248 [Davies Liu] add tests
ec33b5f [Davies Liu] check the python version in worker


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4740d6a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4740d6a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4740d6a1

Branch: refs/heads/master
Commit: 4740d6a158cb4d35408a95265c5b950b9e9628a3
Parents: 0375134
Author: Davies Liu <dav...@databricks.com>
Authored: Fri Apr 10 14:04:53 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Apr 10 14:04:53 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py    |  2 +-
 python/pyspark/tests.py  | 16 ++++++++++++++++
 python/pyspark/worker.py |  6 +++++-
 3 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index c8e54ed..c9ac95d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2233,7 +2233,7 @@ class RDD(object):
 def _prepare_for_python_RDD(sc, command, obj=None):
     # the serialized command will be compressed by broadcast
     ser = CloudPickleSerializer()
-    pickled_command = ser.dumps(command)
+    pickled_command = ser.dumps((command, sys.version_info[:2]))
     if len(pickled_command) > (1 << 20):  # 1M
         broadcast = sc.broadcast(pickled_command)
         pickled_command = ser.dumps(broadcast)

http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 0e3721b..b938b9c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -35,6 +35,8 @@ import itertools
 import threading
 import hashlib
 
+from py4j.protocol import Py4JJavaError
+
 if sys.version_info[:2] <= (2, 6):
     try:
         import unittest2 as unittest
@@ -1494,6 +1496,20 @@ class WorkerTests(PySparkTestCase):
         self.assertTrue(not t.isAlive())
         self.assertEqual(100000, rdd.count())
 
+    def test_with_different_versions_of_python(self):
+        rdd = self.sc.parallelize(range(10))
+        rdd.count()
+        version = sys.version_info
+        sys.version_info = (2, 0, 0)
+        log4j = self.sc._jvm.org.apache.log4j
+        old_level = log4j.LogManager.getRootLogger().getLevel()
+        log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
+        try:
+            self.assertRaises(Py4JJavaError, lambda: rdd.count())
+        finally:
+            sys.version_info = version
+            log4j.LogManager.getRootLogger().setLevel(old_level)
+
 
 class SparkSubmitTests(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 8a93c32..452d6fa 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -88,7 +88,11 @@ def main(infile, outfile):
         command = pickleSer._read_with_length(infile)
         if isinstance(command, Broadcast):
             command = pickleSer.loads(command.value)
-        (func, profiler, deserializer, serializer) = command
+        (func, profiler, deserializer, serializer), version = command
+        if version != sys.version_info[:2]:
+            raise Exception(("Python in worker has different version %s than 
that in " +
+                            "driver %s, PySpark cannot run with different 
minor versions") %
+                            (sys.version_info[:2], version))
         init_time = time.time()
 
         def process():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to