spark git commit: [SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for ImageSchema.readImages
Repository: spark Updated Branches: refs/heads/master ee10ca7ec -> aa4cf2b19 [SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for ImageSchema.readImages ## What changes were proposed in this pull request? Calling `ImageSchema.readImages` multiple times as below in PySpark shell: ```python from pyspark.ml.image import ImageSchema data_path = 'data/mllib/images/kittens' _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() _ = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True).collect() ``` throws an error as below: ``` ... org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: -- java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, see the next exception for details. ... at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source) ... at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762) ... at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:180) ... at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:100) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:88) at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39) at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54) at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52) at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1.(HiveSessionStateBuilder.scala:69) at org.apache.spark.sql.hive.HiveSessionStateBuilder.analyzer(HiveSessionStateBuilder.scala:69) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79) at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:70) at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:574) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:593) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348) at org.apache.spark.ml.image.ImageSchema$$anonfun$readImages$2$$anonfun$apply$1.apply(ImageSchema.scala:253) ... Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, see the next exception for details. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source) ... 121 more Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /.../spark/metastore_db. ... Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/ml/image.py", line 190, in readImages dropImageFailures, float(sampleRatio), seed) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'java.lang.RuntimeException: java.lang.
spark git commit: [SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
Repository: spark Updated Branches: refs/heads/master 9d06a9e0c -> ee10ca7ec [SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus ## What changes were proposed in this pull request? Use a separate Spark event queue for StreamingQueryListenerBus so that if there are many non-streaming events, streaming query listeners don't need to wait for other Spark listeners and can catch up. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #19838 from zsxwing/SPARK-22638. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee10ca7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee10ca7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee10ca7e Branch: refs/heads/master Commit: ee10ca7ec6cf7fbaab3f95a097b46936d97d0835 Parents: 9d06a9e Author: Shixiong Zhu Authored: Fri Dec 1 13:02:03 2017 -0800 Committer: Shixiong Zhu Committed: Fri Dec 1 13:02:03 2017 -0800 -- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 4 +++- .../sql/execution/streaming/StreamingQueryListenerBus.scala| 6 +- 2 files changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2f93c49..2312140 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { * of each other (each one uses a separate thread for delivering events), allowing slower * listeners to be somewhat isolated from others. */ - private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + private[spark] def addToQueue( + listener: SparkListenerInterface, + queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 07e3902..7dd491e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addToSharedQueue(this) + sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus @@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) } } } + +object StreamingQueryListenerBus { + val STREAM_EVENT_QUERY = "streams" +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in class constructors, extends clause
Repository: spark Updated Branches: refs/heads/master 16adaf634 -> 9d06a9e0c [SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in class constructors, extends clause ## What changes were proposed in this pull request? [SPARK-22393](https://issues.apache.org/jira/browse/SPARK-22393) ## How was this patch tested? With a new test case in `RepSuite` This code is a retrofit of the Scala [SI-9881](https://github.com/scala/bug/issues/9881) bug fix, which never made it into the Scala 2.11 branches. Pushing these changes directly to the Scala repo is not practical (see: https://github.com/scala/scala/pull/6195). Author: Mark Petruska Closes #19846 from mpetruska/SPARK-22393. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d06a9e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d06a9e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d06a9e0 Branch: refs/heads/master Commit: 9d06a9e0cf05af99ba210fabae1e77eccfce7986 Parents: 16adaf6 Author: Mark Petruska Authored: Fri Dec 1 05:14:12 2017 -0600 Committer: Sean Owen Committed: Fri Dec 1 05:14:12 2017 -0600 -- .../org/apache/spark/repl/SparkExprTyper.scala | 74 + .../org/apache/spark/repl/SparkILoop.scala | 4 + .../spark/repl/SparkILoopInterpreter.scala | 103 +++ .../scala/org/apache/spark/repl/ReplSuite.scala | 10 ++ 4 files changed, 191 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d06a9e0/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala new file mode 100644 index 000..724ce9a --- /dev/null +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.tools.nsc.interpreter.{ExprTyper, IR} + +trait SparkExprTyper extends ExprTyper { + + import repl._ + import global.{reporter => _, Import => _, _} + import naming.freshInternalVarName + + def doInterpret(code: String): IR.Result = { +// interpret/interpretSynthetic may change the phase, +// which would have unintended effects on types. +val savedPhase = phase +try interpretSynthetic(code) finally phase = savedPhase + } + + override def symbolOfLine(code: String): Symbol = { +def asExpr(): Symbol = { + val name = freshInternalVarName() + // Typing it with a lazy val would give us the right type, but runs + // into compiler bugs with things like existentials, so we compile it + // behind a def and strip the NullaryMethodType which wraps the expr. + val line = "def " + name + " = " + code + + doInterpret(line) match { +case IR.Success => + val sym0 = symbolOfTerm(name) + // drop NullaryMethodType + sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType) +case _ => NoSymbol + } +} + +def asDefn(): Symbol = { + val old = repl.definedSymbolList.toSet + + doInterpret(code) match { +case IR.Success => + repl.definedSymbolList filterNot old match { +case Nil => NoSymbol +case sym :: Nil => sym +case syms => NoSymbol.newOverloaded(NoPrefix, syms) + } +case _ => NoSymbol + } +} + +def asError(): Symbol = { + doInterpret(code) + NoSymbol +} + +beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError() + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/9d06a9e0/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src
spark git commit: [SPARK-22635][SQL][ORC] FileNotFoundException while reading ORC files containing special characters
Repository: spark Updated Branches: refs/heads/branch-2.2 ba00bd961 -> f3f8c8767 [SPARK-22635][SQL][ORC] FileNotFoundException while reading ORC files containing special characters ## What changes were proposed in this pull request? SPARK-22146 fix the FileNotFoundException issue only for the `inferSchema` method, ie. only for the schema inference, but it doesn't fix the problem when actually reading the data. Thus nearly the same exception happens when someone tries to use the data. This PR covers fixing the problem also there. ## How was this patch tested? enhanced UT Author: Marco Gaido Closes #19844 from mgaido91/SPARK-22635. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3f8c876 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3f8c876 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3f8c876 Branch: refs/heads/branch-2.2 Commit: f3f8c8767efbe8c941b4181f71587c65a05e1b82 Parents: ba00bd9 Author: Marco Gaido Authored: Fri Dec 1 01:24:15 2017 +0900 Committer: hyukjinkwon Committed: Fri Dec 1 18:18:57 2017 +0900 -- .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala| 11 +-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 3 ++- 2 files changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f3f8c876/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 54e8f82..2defd31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -131,10 +131,12 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value + val filePath = new Path(new URI(file.filePath)) + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file // using the given physical schema. Instead, we simply return an empty iterator. - val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).isEmpty + val isEmptyFile = OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf)).isEmpty if (isEmptyFile) { Iterator.empty } else { @@ -144,15 +146,12 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val job = Job.getInstance(conf) FileInputFormat.setInputPaths(job, file.filePath) - val fileSplit = new FileSplit( -new Path(new URI(file.filePath)), file.start, file.length, Array.empty - ) + val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) // Custom OrcRecordReader is used to get // ObjectInspector during recordReader creation itself and can // avoid NameNode call in unwrapOrcStructs per file. // Specifically would be helpful for partitioned datasets. - val orcReader = OrcFile.createReader( -new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) + val orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)) new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) } http://git-wip-us.apache.org/repos/asf/spark/blob/f3f8c876/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c0acffb..d62ed19 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1355,7 +1355,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempDir { dir => val tmpFile = s"$dir/$nameWithSpecialChars" spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile) -spark.read.format(format).load(tmpFile) +val fileContent = spark.read.format(format).load(tmpFile) +checkAnswer(fileContent, Seq(Row("a"), Row("b"))) } } } - To unsubscribe,