spark git commit: [SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for ImageSchema.readImages

2017-12-01 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master ee10ca7ec -> aa4cf2b19


[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for 
ImageSchema.readImages

## What changes were proposed in this pull request?

Calling `ImageSchema.readImages` multiple times as below in PySpark shell:

```python
from pyspark.ml.image import ImageSchema
data_path = 'data/mllib/images/kittens'
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
```

throws an error as below:

```
...
org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test 
connection to the given database. JDBC url = 
jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating 
connection pool (set lazyInit to true if you expect to start your database 
after your app). Original Exception: --
java.sql.SQLException: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, 
see the next exception for details.
...
at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
...
at 
org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
...
at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:180)
...
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)
at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:100)
at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:88)
at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
at 
org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1.(HiveSessionStateBuilder.scala:69)
at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.analyzer(HiveSessionStateBuilder.scala:69)
at 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
at 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
at 
org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
at 
org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70)
at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:70)
at 
org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:574)
at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:593)
at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348)
at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348)
at 
org.apache.spark.ml.image.ImageSchema$$anonfun$readImages$2$$anonfun$apply$1.apply(ImageSchema.scala:253)
...
Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, 
see the next exception for details.
at org.apache.derby.iapi.error.StandardException.newException(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
... 121 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
database /.../spark/metastore_db.
...
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/ml/image.py", line 190, in readImages
dropImageFailures, float(sampleRatio), seed)
  File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 
1160, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'java.lang.RuntimeException: 
java.lang.

spark git commit: [SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus

2017-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 9d06a9e0c -> ee10ca7ec


[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus

## What changes were proposed in this pull request?

Use a separate Spark event queue for StreamingQueryListenerBus so that if there 
are many non-streaming events, streaming query listeners don't need to wait for 
other Spark listeners and can catch up.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #19838 from zsxwing/SPARK-22638.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee10ca7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee10ca7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee10ca7e

Branch: refs/heads/master
Commit: ee10ca7ec6cf7fbaab3f95a097b46936d97d0835
Parents: 9d06a9e
Author: Shixiong Zhu 
Authored: Fri Dec 1 13:02:03 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 1 13:02:03 2017 -0800

--
 .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 4 +++-
 .../sql/execution/streaming/StreamingQueryListenerBus.scala| 6 +-
 2 files changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 2f93c49..2312140 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
* of each other (each one uses a separate thread for delivering events), 
allowing slower
* listeners to be somewhat isolated from others.
*/
-  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+  private[spark] def addToQueue(
+  listener: SparkListenerInterface,
+  queue: String): Unit = synchronized {
 if (stopped.get()) {
   throw new IllegalStateException("LiveListenerBus is stopped.")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 07e3902..7dd491e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
 
   import StreamingQueryListener._
 
-  sparkListenerBus.addToSharedQueue(this)
+  sparkListenerBus.addToQueue(this, 
StreamingQueryListenerBus.STREAM_EVENT_QUERY)
 
   /**
* RunIds of active queries whose events are supposed to be forwarded by 
this ListenerBus
@@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
 }
   }
 }
+
+object StreamingQueryListenerBus {
+  val STREAM_EVENT_QUERY = "streams"
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in class constructors, extends clause

2017-12-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 16adaf634 -> 9d06a9e0c


[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in class 
constructors, extends clause

## What changes were proposed in this pull request?

[SPARK-22393](https://issues.apache.org/jira/browse/SPARK-22393)

## How was this patch tested?

With a new test case in `RepSuite`



This code is a retrofit of the Scala 
[SI-9881](https://github.com/scala/bug/issues/9881) bug fix, which never made 
it into the Scala 2.11 branches. Pushing these changes directly to the Scala 
repo is not practical (see: https://github.com/scala/scala/pull/6195).

Author: Mark Petruska 

Closes #19846 from mpetruska/SPARK-22393.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d06a9e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d06a9e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d06a9e0

Branch: refs/heads/master
Commit: 9d06a9e0cf05af99ba210fabae1e77eccfce7986
Parents: 16adaf6
Author: Mark Petruska 
Authored: Fri Dec 1 05:14:12 2017 -0600
Committer: Sean Owen 
Committed: Fri Dec 1 05:14:12 2017 -0600

--
 .../org/apache/spark/repl/SparkExprTyper.scala  |  74 +
 .../org/apache/spark/repl/SparkILoop.scala  |   4 +
 .../spark/repl/SparkILoopInterpreter.scala  | 103 +++
 .../scala/org/apache/spark/repl/ReplSuite.scala |  10 ++
 4 files changed, 191 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9d06a9e0/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
new file mode 100644
index 000..724ce9a
--- /dev/null
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc.interpreter.{ExprTyper, IR}
+
+trait SparkExprTyper extends ExprTyper {
+
+  import repl._
+  import global.{reporter => _, Import => _, _}
+  import naming.freshInternalVarName
+
+  def doInterpret(code: String): IR.Result = {
+// interpret/interpretSynthetic may change the phase,
+// which would have unintended effects on types.
+val savedPhase = phase
+try interpretSynthetic(code) finally phase = savedPhase
+  }
+
+  override def symbolOfLine(code: String): Symbol = {
+def asExpr(): Symbol = {
+  val name = freshInternalVarName()
+  // Typing it with a lazy val would give us the right type, but runs
+  // into compiler bugs with things like existentials, so we compile it
+  // behind a def and strip the NullaryMethodType which wraps the expr.
+  val line = "def " + name + " = " + code
+
+  doInterpret(line) match {
+case IR.Success =>
+  val sym0 = symbolOfTerm(name)
+  // drop NullaryMethodType
+  sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType)
+case _ => NoSymbol
+  }
+}
+
+def asDefn(): Symbol = {
+  val old = repl.definedSymbolList.toSet
+
+  doInterpret(code) match {
+case IR.Success =>
+  repl.definedSymbolList filterNot old match {
+case Nil => NoSymbol
+case sym :: Nil => sym
+case syms => NoSymbol.newOverloaded(NoPrefix, syms)
+  }
+case _ => NoSymbol
+  }
+}
+
+def asError(): Symbol = {
+  doInterpret(code)
+  NoSymbol
+}
+
+beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9d06a9e0/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src

spark git commit: [SPARK-22635][SQL][ORC] FileNotFoundException while reading ORC files containing special characters

2017-12-01 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ba00bd961 -> f3f8c8767


[SPARK-22635][SQL][ORC] FileNotFoundException while reading ORC files 
containing special characters

## What changes were proposed in this pull request?

SPARK-22146 fix the FileNotFoundException issue only for the `inferSchema` 
method, ie. only for the schema inference, but it doesn't fix the problem when 
actually reading the data. Thus nearly the same exception happens when someone 
tries to use the data. This PR covers fixing the problem also there.

## How was this patch tested?

enhanced UT

Author: Marco Gaido 

Closes #19844 from mgaido91/SPARK-22635.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3f8c876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3f8c876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3f8c876

Branch: refs/heads/branch-2.2
Commit: f3f8c8767efbe8c941b4181f71587c65a05e1b82
Parents: ba00bd9
Author: Marco Gaido 
Authored: Fri Dec 1 01:24:15 2017 +0900
Committer: hyukjinkwon 
Committed: Fri Dec 1 18:18:57 2017 +0900

--
 .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala| 11 +--
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala   |  3 ++-
 2 files changed, 7 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f3f8c876/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 54e8f82..2defd31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -131,10 +131,12 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 (file: PartitionedFile) => {
   val conf = broadcastedHadoopConf.value.value
 
+  val filePath = new Path(new URI(file.filePath))
+
   // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer. In this
   // case, `OrcFileOperator.readSchema` returns `None`, and we can't read 
the underlying file
   // using the given physical schema. Instead, we simply return an empty 
iterator.
-  val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), 
Some(conf)).isEmpty
+  val isEmptyFile = OrcFileOperator.readSchema(Seq(filePath.toString), 
Some(conf)).isEmpty
   if (isEmptyFile) {
 Iterator.empty
   } else {
@@ -144,15 +146,12 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
   val job = Job.getInstance(conf)
   FileInputFormat.setInputPaths(job, file.filePath)
 
-  val fileSplit = new FileSplit(
-new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty
-  )
+  val fileSplit = new FileSplit(filePath, file.start, file.length, 
Array.empty)
   // Custom OrcRecordReader is used to get
   // ObjectInspector during recordReader creation itself and can
   // avoid NameNode call in unwrapOrcStructs per file.
   // Specifically would be helpful for partitioned datasets.
-  val orcReader = OrcFile.createReader(
-new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
+  val orcReader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf))
   new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, 
fileSplit.getLength)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f3f8c876/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c0acffb..d62ed19 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1355,7 +1355,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
   withTempDir { dir =>
 val tmpFile = s"$dir/$nameWithSpecialChars"
 spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
-spark.read.format(format).load(tmpFile)
+val fileContent = spark.read.format(format).load(tmpFile)
+checkAnswer(fileContent, Seq(Row("a"), Row("b")))
   }
 }
   }


-
To unsubscribe,