Repository: systemml Updated Branches: refs/heads/master 0ba9e74b9 -> b34079a28
[MINOR] Enable systemml to be imported in the pyspark workers - Moved the race condition avoidance logic to classloader instead of import level. This avoids creation of dataframe in pyspark workers. Closes #652. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b34079a2 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b34079a2 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b34079a2 Branch: refs/heads/master Commit: b34079a283a1859ed23de77f4ff0e50985b57dd3 Parents: 0ba9e74 Author: Niketan Pansare <npan...@us.ibm.com> Authored: Tue Sep 5 09:59:59 2017 -0700 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Tue Sep 5 10:01:55 2017 -0700 ---------------------------------------------------------------------- src/main/python/systemml/classloader.py | 14 +++++++++++++- src/main/python/systemml/mlcontext.py | 4 ---- 2 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/b34079a2/src/main/python/systemml/classloader.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/classloader.py b/src/main/python/systemml/classloader.py index 8738dc5..015a3dc 100644 --- a/src/main/python/systemml/classloader.py +++ b/src/main/python/systemml/classloader.py @@ -22,15 +22,27 @@ __all__ = ['createJavaObject'] import os +import numpy as np +import pandas as pd try: import py4j.java_gateway from py4j.java_gateway import JavaObject from pyspark import SparkContext + from pyspark.sql import SparkSession except ImportError: raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.') +_initializedSparkSession = False def _createJavaObject(sc, obj_type): + # ----------------------------------------------------------------------------------- + # Avoids race condition between locking of metastore_db of Scala SparkSession and PySpark SparkSession. + # This is done at toDF() rather than import level to avoid creation of SparkSession in worker processes. + global _initializedSparkSession + if not _initializedSparkSession: + _initializedSparkSession = True + SparkSession.builder.getOrCreate().createDataFrame(pd.DataFrame(np.array([[1,2],[3,4]]))) + # ----------------------------------------------------------------------------------- if obj_type == 'mlcontext': return sc._jvm.org.apache.sysml.api.mlcontext.MLContext(sc._jsc) elif obj_type == 'dummy': @@ -89,4 +101,4 @@ def createJavaObject(sc, obj_type): jar_file_name = _getJarFileName(sc, '-extra') x = _getLoaderInstance(sc, jar_file_name, 'org.apache.sysml.api.dl.Caffe2DMLLoader', hint + 'systemml-*-extra.jar') x.loadCaffe2DML(jar_file_name) - return ret \ No newline at end of file + return ret http://git-wip-us.apache.org/repos/asf/systemml/blob/b34079a2/src/main/python/systemml/mlcontext.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mlcontext.py b/src/main/python/systemml/mlcontext.py index 60705c5..4a555f7 100644 --- a/src/main/python/systemml/mlcontext.py +++ b/src/main/python/systemml/mlcontext.py @@ -36,11 +36,7 @@ try: from pyspark import SparkContext from pyspark.conf import SparkConf import pyspark.mllib.common - # ----------------------------------------------------------------------------------- - # Avoids race condition between locking of metastore_db of Scala SparkSession and PySpark SparkSession from pyspark.sql import SparkSession - SparkSession.builder.getOrCreate().createDataFrame(pd.DataFrame(np.array([[1,2],[3,4]]))) - # ----------------------------------------------------------------------------------- except ImportError: raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')