Repository: spark Updated Branches: refs/heads/master 0375134f4 -> 4740d6a15
[SPARK-6216] [PySpark] check the python version in worker Author: Davies Liu <dav...@databricks.com> Closes #5404 from davies/check_version and squashes the following commits: e559248 [Davies Liu] add tests ec33b5f [Davies Liu] check the python version in worker Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4740d6a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4740d6a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4740d6a1 Branch: refs/heads/master Commit: 4740d6a158cb4d35408a95265c5b950b9e9628a3 Parents: 0375134 Author: Davies Liu <dav...@databricks.com> Authored: Fri Apr 10 14:04:53 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Fri Apr 10 14:04:53 2015 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 2 +- python/pyspark/tests.py | 16 ++++++++++++++++ python/pyspark/worker.py | 6 +++++- 3 files changed, 22 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c8e54ed..c9ac95d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2233,7 +2233,7 @@ class RDD(object): def _prepare_for_python_RDD(sc, command, obj=None): # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() - pickled_command = ser.dumps(command) + pickled_command = ser.dumps((command, sys.version_info[:2])) if len(pickled_command) > (1 << 20): # 1M broadcast = sc.broadcast(pickled_command) pickled_command = ser.dumps(broadcast) http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 0e3721b..b938b9c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -35,6 +35,8 @@ import itertools import threading import hashlib +from py4j.protocol import Py4JJavaError + if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest @@ -1494,6 +1496,20 @@ class WorkerTests(PySparkTestCase): self.assertTrue(not t.isAlive()) self.assertEqual(100000, rdd.count()) + def test_with_different_versions_of_python(self): + rdd = self.sc.parallelize(range(10)) + rdd.count() + version = sys.version_info + sys.version_info = (2, 0, 0) + log4j = self.sc._jvm.org.apache.log4j + old_level = log4j.LogManager.getRootLogger().getLevel() + log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) + try: + self.assertRaises(Py4JJavaError, lambda: rdd.count()) + finally: + sys.version_info = version + log4j.LogManager.getRootLogger().setLevel(old_level) + class SparkSubmitTests(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/4740d6a1/python/pyspark/worker.py ---------------------------------------------------------------------- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8a93c32..452d6fa 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -88,7 +88,11 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, profiler, deserializer, serializer) = command + (func, profiler, deserializer, serializer), version = command + if version != sys.version_info[:2]: + raise Exception(("Python in worker has different version %s than that in " + + "driver %s, PySpark cannot run with different minor versions") % + (sys.version_info[:2], version)) init_time = time.time() def process(): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org