Repository: spark Updated Branches: refs/heads/master ee10ca7ec -> aa4cf2b19
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for ImageSchema.readImages ## What changes were proposed in this pull request? Calling `ImageSchema.readImages` multiple times as below in PySpark shell: ```python from pyspark.ml.image import ImageSchema data_path = 'data/mllib/images/kittens' _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() ``` throws an error as below: ``` ... org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------ java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, see the next exception for details. ... at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source) ... at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762) ... at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:180) ... at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:100) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:88) at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39) at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54) at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52) at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1.<init>(HiveSessionStateBuilder.scala:69) at org.apache.spark.sql.hive.HiveSessionStateBuilder.analyzer(HiveSessionStateBuilder.scala:69) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79) at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:70) at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:574) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:593) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348) at org.apache.spark.ml.image.ImageSchema$$anonfun$readImages$2$$anonfun$apply$1.apply(ImageSchema.scala:253) ... Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, see the next exception for details. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source) ... 121 more Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /.../spark/metastore_db. ... Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/ml/image.py", line 190, in readImages dropImageFailures, float(sampleRatio), seed) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;' ``` Seems we better stick to `SparkSession.builder.getOrCreate()` like: https://github.com/apache/spark/blob/51620e288b5e0a7fffc3899c9deadabace28e6d7/python/pyspark/sql/streaming.py#L329 https://github.com/apache/spark/blob/dc5d34d8dcd6526d1dfdac8606661561c7576a62/python/pyspark/sql/column.py#L541 https://github.com/apache/spark/blob/33d43bf1b6f55594187066f0e38ba3985fa2542b/python/pyspark/sql/readwriter.py#L105 ## How was this patch tested? This was tested as below in PySpark shell: ```python from pyspark.ml.image import ImageSchema data_path = 'data/mllib/images/kittens' _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() ``` Author: hyukjinkwon <gurwls...@gmail.com> Closes #19845 from HyukjinKwon/SPARK-22651. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa4cf2b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa4cf2b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa4cf2b1 Branch: refs/heads/master Commit: aa4cf2b19e4cf5588af7e2192e0e9f687cd84bc5 Parents: ee10ca7 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Sat Dec 2 11:55:43 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Sat Dec 2 11:55:43 2017 +0900 ---------------------------------------------------------------------- python/pyspark/ml/image.py | 5 ++--- python/pyspark/ml/tests.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aa4cf2b1/python/pyspark/ml/image.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py index 2b61aa9..384599d 100644 --- a/python/pyspark/ml/image.py +++ b/python/pyspark/ml/image.py @@ -201,9 +201,8 @@ class _ImageSchema(object): .. versionadded:: 2.3.0 """ - ctx = SparkContext._active_spark_context - spark = SparkSession(ctx) - image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema + spark = SparkSession.builder.getOrCreate() + image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema jsession = spark._jsparkSession jresult = image_schema.readImages(path, jsession, recursive, numPartitions, dropImageFailures, float(sampleRatio), seed) http://git-wip-us.apache.org/repos/asf/spark/blob/aa4cf2b1/python/pyspark/ml/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 89ef555..3a0b816 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -67,7 +67,7 @@ from pyspark.ml.tuning import * from pyspark.ml.util import * from pyspark.ml.wrapper import JavaParams, JavaWrapper from pyspark.serializers import PickleSerializer -from pyspark.sql import DataFrame, Row, SparkSession +from pyspark.sql import DataFrame, Row, SparkSession, HiveContext from pyspark.sql.functions import rand from pyspark.sql.types import DoubleType, IntegerType from pyspark.storagelevel import * @@ -1855,6 +1855,35 @@ class ImageReaderTest(SparkSessionTestCase): lambda: ImageSchema.toImage("a")) +class ImageReaderTest2(PySparkTestCase): + + @classmethod + def setUpClass(cls): + PySparkTestCase.setUpClass() + # Note that here we enable Hive's support. + try: + cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() + except py4j.protocol.Py4JError: + cls.tearDownClass() + raise unittest.SkipTest("Hive is not available") + except TypeError: + cls.tearDownClass() + raise unittest.SkipTest("Hive is not available") + cls.spark = HiveContext._createForTesting(cls.sc) + + @classmethod + def tearDownClass(cls): + PySparkTestCase.tearDownClass() + cls.spark.sparkSession.stop() + + def test_read_images_multiple_times(self): + # This test case is to check if `ImageSchema.readImages` tries to + # initiate Hive client multiple times. See SPARK-22651. + data_path = 'data/mllib/images/kittens' + ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True) + ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True) + + class ALSTest(SparkSessionTestCase): def test_storage_levels(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org