[spark] branch master updated: [SPARK-28012][SQL] Hive UDF supports struct type foldable expression

2019-06-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d9697fe  [SPARK-28012][SQL] Hive UDF supports struct type foldable 
expression
d9697fe is described below

commit d9697fedf5a2fa56e25849b0715d48ac8e5345f5
Author: sychen 
AuthorDate: Thu Jun 20 14:36:01 2019 +0900

[SPARK-28012][SQL] Hive UDF supports struct type foldable expression

## What changes were proposed in this pull request?

Currently using hive udf, the parameter is struct type, there will be an 
exception thrown.

No handler for Hive UDF 'xxxUDF': java.lang.RuntimeException: Hive doesn't 
support the constant type [StructType(StructField(name,StringType,true), 
StructField(value,DecimalType(3,1),true))]

## How was this patch tested?
added new UT

Closes #24846 from cxzl25/hive_udf_literal_struct_type.

Authored-by: sychen 
Signed-off-by: HyukjinKwon 
---
 .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala |  2 ++
 .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala| 11 +++
 2 files changed, 13 insertions(+)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 178fced..33b5bce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -847,6 +847,8 @@ private[hive] trait HiveInspectors {
 
 ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, 
valueOI, jmap)
   }
+case Literal(_, dt: StructType) =>
+  toInspector(dt)
 // We will enumerate all of the possible constant expressions, throw 
exception if we missed
 case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type 
[$dt].")
 // ideally, we don't test the foldable here(but in optimizer), however, 
some of the
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 446267d..587eab4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -652,6 +652,17 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
   }
 }
   }
+  test("SPARK-28012 Hive UDF supports struct type foldable expression") {
+withUserDefinedFunction("testUDFStructType" -> false) {
+  // Simulate a hive udf that supports struct parameters
+  sql("CREATE FUNCTION testUDFStructType AS '" +
+s"${classOf[GenericUDFArray].getName}'")
+  checkAnswer(
+sql("SELECT testUDFStructType(named_struct('name', 'xx', 'value', 
1))[0].value"),
+Seq(Row(1)))
+}
+  }
+
 }
 
 class TestPair(x: Int, y: Int) extends Writable with Serializable {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled

2019-06-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4968f87  [SPARK-23263][TEST] CTAS should update stat if autoUpdate 
statistics is enabled
4968f87 is described below

commit 4968f871685c23b83690a9a1490ed886a3417d93
Author: Yuming Wang 
AuthorDate: Thu Jun 20 14:19:10 2019 +0900

[SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is 
enabled

## What changes were proposed in this pull request?
The [SPARK-27403](https://issues.apache.org/jira/browse/SPARK-27403) fixed 
CTAS cannot update statistics even if 
`spark.sql.statistics.size.autoUpdate.enabled` is enabled, as mentioned in 
[SPARK-23263](https://issues.apache.org/jira/browse/SPARK-23263).

This pr adds tests for that fix.

## How was this patch tested?

N/A

Closes #20430 from wangyum/SPARK-23263.

Authored-by: Yuming Wang 
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/hive/StatisticsSuite.scala| 22 ++
 1 file changed, 22 insertions(+)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 483bd37..7a8e257 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1431,4 +1431,26 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
   assert(catalogStats.rowCount.isEmpty)
 }
   }
+
+  test(s"CTAS should update statistics if 
${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") {
+val tableName = "SPARK_23263"
+Seq(false, true).foreach { isConverted =>
+  Seq(false, true).foreach { updateEnabled =>
+withSQLConf(
+  SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString,
+  HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) {
+  withTable(tableName) {
+sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 
'b'")
+val catalogTable = getCatalogTable(tableName)
+// Hive serde tables always update statistics by Hive metastore
+if (!isConverted || updateEnabled) {
+  assert(catalogTable.stats.nonEmpty)
+} else {
+  assert(catalogTable.stats.isEmpty)
+}
+  }
+}
+  }
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink

2019-06-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f510761  [SPARK-28089][SQL] File source v2: support reading output of 
file streaming Sink
f510761 is described below

commit f5107614d6e38d758d9d17f2dc5d57bc9c8918a1
Author: Gengliang Wang 
AuthorDate: Thu Jun 20 12:57:13 2019 +0800

[SPARK-28089][SQL] File source v2: support reading output of file streaming 
Sink

## What changes were proposed in this pull request?

File source V1 supports reading output of FileStreamSink as batch. 
https://github.com/apache/spark/pull/11897
We should support this in file source V2 as well. When reading with paths, 
we first check if there is metadata log of FileStreamSink. If yes, we use 
`MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`.

## How was this patch tested?

Unit test

Closes #24900 from gengliangwang/FileStreamV2.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/datasources/v2/FileTable.scala   |  47 +++--
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 189 +
 .../apache/spark/sql/streaming/StreamSuite.scala   |  11 +-
 .../streaming/StreamingDeduplicationSuite.scala|  50 +++---
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  87 +-
 5 files changed, 223 insertions(+), 161 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 3b0cde5..4483f5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -20,11 +20,12 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, 
TableCapability}
 import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -44,23 +45,37 @@ abstract class FileTable(
 val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
 // Hadoop Configurations are case sensitive.
 val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
-val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, 
hadoopConf,
-  checkEmptyGlobPath = true, checkFilesExist = true)
-val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-new InMemoryFileIndex(
-  sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, 
fileStatusCache)
+if (FileStreamSink.hasMetadata(paths, hadoopConf, 
sparkSession.sessionState.conf)) {
+  // We are reading from the results of a streaming query. We will load 
files from
+  // the metadata log instead of listing them using HDFS APIs.
+  new MetadataLogFileIndex(sparkSession, new Path(paths.head),
+options.asScala.toMap, userSpecifiedSchema)
+} else {
+  // This is a non-streaming file based datasource.
+  val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, 
hadoopConf,
+checkEmptyGlobPath = true, checkFilesExist = true)
+  val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+  new InMemoryFileIndex(
+sparkSession, rootPathsSpecified, caseSensitiveMap, 
userSpecifiedSchema, fileStatusCache)
+}
   }
 
-  lazy val dataSchema: StructType = userSpecifiedSchema.map { schema =>
-val partitionSchema = fileIndex.partitionSchema
-val resolver = sparkSession.sessionState.conf.resolver
-StructType(schema.filterNot(f => partitionSchema.exists(p => 
resolver(p.name, f.name
-  }.orElse {
-inferSchema(fileIndex.allFiles())
-  }.getOrElse {
-throw new AnalysisException(
-  s"Unable to infer schema for $formatName. It must be specified 
manually.")
-  }.asNullable
+  lazy val dataSchema: StructType = {
+val schema = userSpecifiedSchema.map { schema =>
+  val partitionSchema = fileIndex.partitionSchema
+  val resolver = sparkSession.sessionState.conf.resolver
+  StructType(schema.filterNot(f => partitionSchema.exists(p => 
resolver(p.name, f.name
+}.orElse {
+  inferSchema(fileIndex.allFiles())
+}.getOrElse {
+  throw new AnalysisException(
+s"Unable to infer schema for $formatName. It must be s

[spark] branch master updated: [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource

2019-06-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b276788  [SPARK-27990][SQL][ML] Provide a way to recursively load data 
from datasource
b276788 is described below

commit b276788d57b270d455ef6a7c5ed6cf8a74885dde
Author: WeichenXu 
AuthorDate: Thu Jun 20 12:43:01 2019 +0800

[SPARK-27990][SQL][ML] Provide a way to recursively load data from 
datasource

## What changes were proposed in this pull request?

Provide a way to recursively load data from datasource.
I add a "recursiveFileLookup" option.

When "recursiveFileLookup" option turn on, then partition inferring is 
turned off and all files from the directory will be loaded recursively.

If some datasource explicitly specify the partitionSpec, then if user turn 
on "recursive" option, then exception will be thrown.

## How was this patch tested?

Unit tests.

Please review https://spark.apache.org/contributing.html before opening a 
pull request.

Closes #24830 from WeichenXu123/recursive_ds.

Authored-by: WeichenXu 
Signed-off-by: Wenchen Fan 
---
 .../ml/source/image/ImageFileFormatSuite.scala |  1 +
 .../datasources/PartitioningAwareFileIndex.scala   | 48 +--
 .../spark/sql/FileBasedDataSourceSuite.scala   | 70 ++
 3 files changed, 101 insertions(+), 18 deletions(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
index 38e2513..38bb246 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
@@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   // Single column of images named "image"
   private lazy val imagePath = "../data/mllib/images/partitioned"
+  private lazy val recursiveImagePath = "../data/mllib/images"
 
   test("image datasource count test") {
 val df1 = spark.read.format("image").load(imagePath)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3c93255..3adec2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex(
 pathGlobFilter.forall(_.accept(file.getPath))
   }
 
+  protected lazy val recursiveFileLookup = {
+parameters.getOrElse("recursiveFileLookup", "false").toBoolean
+  }
+
   override def listFiles(
   partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
 def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex(
 val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
   PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) 
:: Nil
 } else {
+  if (recursiveFileLookup) {
+throw new IllegalArgumentException(
+  "Datasource with partition do not allow recursive file loading.")
+  }
   prunePartitions(partitionFilters, partitionSpec()).map {
 case PartitionPath(values, path) =>
   val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
@@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex(
   override def sizeInBytes: Long = allFiles().map(_.getLen).sum
 
   def allFiles(): Seq[FileStatus] = {
-val files = if (partitionSpec().partitionColumns.isEmpty) {
+val files = if (partitionSpec().partitionColumns.isEmpty && 
!recursiveFileLookup) {
   // For each of the root input paths, get the list of files inside them
   rootPaths.flatMap { path =>
 // Make the path qualified (consistent with listLeafFiles and 
bulkListLeafFiles).
@@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex(
   }
 
   protected def inferPartitioning(): PartitionSpec = {
-// We use leaf dirs containing data files to discover the schema.
-val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
-  files.exists(f => isDataPath(f.getPath))
-}.keys.toSeq
-
-val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
-val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
-  .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-PartitioningUtils.parsePartitions(
-  leafDirs,
-  typeInference = 
sparkSession.sessionSta

[spark] branch master updated: [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to absence of ML/MLlib classes

2019-06-19 Thread joshrosen
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ec032ce  [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in 
tests due to absence of ML/MLlib classes
ec032ce is described below

commit ec032cea4f91a5ee6ce51e2216de23104486a053
Author: Josh Rosen 
AuthorDate: Wed Jun 19 19:06:22 2019 -0700

[SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to 
absence of ML/MLlib classes

## What changes were proposed in this pull request?
In a nutshell, it looks like the absence of ML / MLlib classes on the 
classpath causes code in KryoSerializer to throw and catch 
ClassNotFoundExceptions whenever instantiating a new serializer in 
newInstance(). This isn't a performance problem in production (since MLlib is 
on the classpath there) but it's a huge issue in tests and appears to account 
for an enormous amount of test time

We can address this problem by reducing the total number of 
ClassNotFoundExceptions by performing the class existence checks once and 
storing the results in KryoSerializer instances rather than repeating the 
checks on each newInstance() call.

## How was this patch tested?
The existing tests.

Authored-by: Josh Rosen 

Closes #24916 from gatorsmile/kryoException.

Lead-authored-by: Josh Rosen 
Co-authored-by: gatorsmile 
Signed-off-by: Josh Rosen 
---
 .../apache/spark/serializer/KryoSerializer.scala   | 78 +-
 1 file changed, 45 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 3969106..20774c8 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -212,40 +212,8 @@ class KryoSerializer(conf: SparkConf)
 
 // We can't load those class directly in order to avoid unnecessary jar 
dependencies.
 // We load them safely, ignore it if the class not found.
-Seq(
-  "org.apache.spark.sql.catalyst.expressions.UnsafeRow",
-  "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
-  "org.apache.spark.sql.catalyst.expressions.UnsafeMapData",
-
-  "org.apache.spark.ml.attribute.Attribute",
-  "org.apache.spark.ml.attribute.AttributeGroup",
-  "org.apache.spark.ml.attribute.BinaryAttribute",
-  "org.apache.spark.ml.attribute.NominalAttribute",
-  "org.apache.spark.ml.attribute.NumericAttribute",
-
-  "org.apache.spark.ml.feature.Instance",
-  "org.apache.spark.ml.feature.LabeledPoint",
-  "org.apache.spark.ml.feature.OffsetInstance",
-  "org.apache.spark.ml.linalg.DenseMatrix",
-  "org.apache.spark.ml.linalg.DenseVector",
-  "org.apache.spark.ml.linalg.Matrix",
-  "org.apache.spark.ml.linalg.SparseMatrix",
-  "org.apache.spark.ml.linalg.SparseVector",
-  "org.apache.spark.ml.linalg.Vector",
-  "org.apache.spark.ml.stat.distribution.MultivariateGaussian",
-  "org.apache.spark.ml.tree.impl.TreePoint",
-  "org.apache.spark.mllib.clustering.VectorWithNorm",
-  "org.apache.spark.mllib.linalg.DenseMatrix",
-  "org.apache.spark.mllib.linalg.DenseVector",
-  "org.apache.spark.mllib.linalg.Matrix",
-  "org.apache.spark.mllib.linalg.SparseMatrix",
-  "org.apache.spark.mllib.linalg.SparseVector",
-  "org.apache.spark.mllib.linalg.Vector",
-  "org.apache.spark.mllib.regression.LabeledPoint",
-  "org.apache.spark.mllib.stat.distribution.MultivariateGaussian"
-).foreach { name =>
+KryoSerializer.loadableSparkClasses.foreach { clazz =>
   try {
-val clazz = Utils.classForName(name)
 kryo.register(clazz)
   } catch {
 case NonFatal(_) => // do nothing
@@ -516,6 +484,50 @@ private[serializer] object KryoSerializer {
   }
 }
   )
+
+  // classForName() is expensive in case the class is not found, so we filter 
the list of
+  // SQL / ML / MLlib classes once and then re-use that filtered list in 
newInstance() calls.
+  private lazy val loadableSparkClasses: Seq[Class[_]] = {
+Seq(
+  "org.apache.spark.sql.catalyst.expressions.UnsafeRow",
+  "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
+  "org.apache.spark.sql.catalyst.expressions.UnsafeMapData",
+
+  "org.apache.spark.ml.attribute.Attribute",
+  "org.apache.spark.ml.attribute.AttributeGroup",
+  "org.apache.spark.ml.attribute.BinaryAttribute",
+  "org.apache.spark.ml.attribute.NominalAttribute",
+  "org.apache.spark.ml.attribute.NumericAttribute",
+
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.LabeledPoint",
+  "org.apache.spark.ml.feature.OffsetInstance",

[spark] branch branch-2.4 updated: [SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype checking thread safe

2019-06-19 Thread joshrosen
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ba7f61e  [SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype 
checking thread safe
ba7f61e is described below

commit ba7f61e25d58aa379f94a23b03503a25574529bc
Author: mwlon 
AuthorDate: Wed Jun 19 19:03:35 2019 -0700

[SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype checking thread 
safe

This is a Spark 2.4.x backport of #24085. Original description follows 
below:

## What changes were proposed in this pull request?

Make ScalaReflection subtype checking thread safe by adding a lock. There 
is a thread safety bug in the <:< operator in all versions of scala 
(https://github.com/scala/bug/issues/10766).

## How was this patch tested?

Existing tests and a new one for the new subtype checking function.

Closes #24913 from JoshRosen/joshrosen/SPARK-26555-branch-2.4-backport.

Authored-by: mwlon 
Signed-off-by: Josh Rosen 
---
 .../spark/sql/catalyst/ScalaReflection.scala   | 216 +++--
 .../spark/sql/catalyst/ScalaReflectionSuite.scala  |   6 +
 2 files changed, 124 insertions(+), 98 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index c27180e..1b186bf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -40,6 +40,9 @@ import org.apache.spark.unsafe.types.{CalendarInterval, 
UTF8String}
 trait DefinedByConstructorParams
 
 
+private[catalyst] object ScalaSubtypeLock
+
+
 /**
  * A default version of ScalaReflection that uses the runtime universe.
  */
@@ -68,19 +71,32 @@ object ScalaReflection extends ScalaReflection {
*/
   def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])
 
+  /**
+   * Synchronize to prevent concurrent usage of `<:<` operator.
+   * This operator is not thread safe in any current version of scala; i.e.
+   * (2.11.12, 2.12.8, 2.13.0-M5).
+   *
+   * See https://github.com/scala/bug/issues/10766
+   */
+  private[catalyst] def isSubtype(tpe1: `Type`, tpe2: `Type`): Boolean = {
+ScalaSubtypeLock.synchronized {
+  tpe1 <:< tpe2
+}
+  }
+
   private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
 tpe.dealias match {
-  case t if t <:< definitions.NullTpe => NullType
-  case t if t <:< definitions.IntTpe => IntegerType
-  case t if t <:< definitions.LongTpe => LongType
-  case t if t <:< definitions.DoubleTpe => DoubleType
-  case t if t <:< definitions.FloatTpe => FloatType
-  case t if t <:< definitions.ShortTpe => ShortType
-  case t if t <:< definitions.ByteTpe => ByteType
-  case t if t <:< definitions.BooleanTpe => BooleanType
-  case t if t <:< localTypeOf[Array[Byte]] => BinaryType
-  case t if t <:< localTypeOf[CalendarInterval] => CalendarIntervalType
-  case t if t <:< localTypeOf[Decimal] => DecimalType.SYSTEM_DEFAULT
+  case t if isSubtype(t, definitions.NullTpe) => NullType
+  case t if isSubtype(t, definitions.IntTpe) => IntegerType
+  case t if isSubtype(t, definitions.LongTpe) => LongType
+  case t if isSubtype(t, definitions.DoubleTpe) => DoubleType
+  case t if isSubtype(t, definitions.FloatTpe) => FloatType
+  case t if isSubtype(t, definitions.ShortTpe) => ShortType
+  case t if isSubtype(t, definitions.ByteTpe) => ByteType
+  case t if isSubtype(t, definitions.BooleanTpe) => BooleanType
+  case t if isSubtype(t, localTypeOf[Array[Byte]]) => BinaryType
+  case t if isSubtype(t, localTypeOf[CalendarInterval]) => 
CalendarIntervalType
+  case t if isSubtype(t, localTypeOf[Decimal]) => 
DecimalType.SYSTEM_DEFAULT
   case _ =>
 val className = getClassNameFromType(tpe)
 className match {
@@ -103,13 +119,13 @@ object ScalaReflection extends ScalaReflection {
*/
   private def arrayClassFor(tpe: `Type`): ObjectType = 
cleanUpReflectionObjects {
 val cls = tpe.dealias match {
-  case t if t <:< definitions.IntTpe => classOf[Array[Int]]
-  case t if t <:< definitions.LongTpe => classOf[Array[Long]]
-  case t if t <:< definitions.DoubleTpe => classOf[Array[Double]]
-  case t if t <:< definitions.FloatTpe => classOf[Array[Float]]
-  case t if t <:< definitions.ShortTpe => classOf[Array[Short]]
-  case t if t <:< definitions.ByteTpe => classOf[Array[Byte]]
-  case t if t <:< definitions.BooleanTpe => classOf[Array[Boolean]]
+  case t if isSubtype(t, definitions.IntTpe) => classOf[Array[Int]]
+  case t if isSubtype(t, definitions.LongTpe) => classOf

[spark] branch master updated: [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize

2019-06-19 Thread joshrosen
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b27ad5  [SPARK-28102][CORE] Avoid performance problems when lz4-java 
JNI libraries fail to initialize
6b27ad5 is described below

commit 6b27ad5ea11297c39ac216054f061af334387a59
Author: Josh Rosen 
AuthorDate: Wed Jun 19 15:26:26 2019 -0700

[SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries 
fail to initialize

## What changes were proposed in this pull request?

This PR fixes a performance problem in environments where `lz4-java`'s 
native JNI libraries fail to initialize.

Spark's uses `lz4-java` for LZ4 compression. Under the hood, the 
`LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call 
`LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls 
back on Java implementations in case the JNI library cannot be loaded or 
initialized.

If the LZ4 JNI libraries are present on the library load path 
(`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by 
shading) then an exception will be thrown and caught, triggering fallback to 
`fastestJavaInstance()` (a non-JNI implementation).

Unfortunately, the LZ4 library does not cache the fact that the JNI library 
failed during initialization, so every call to `LZ4Factory.fastestInstance()` 
re-attempts (and fails) to initialize the native code. These initialization 
attempts are performed in a `static synchronized` method, so exceptions from 
failures are thrown while holding shared monitors and this causes 
monitor-contention performance issues. Here's an example stack trace showing 
the problem:

```java

java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding 
Monitor(java.lang.NoClassDefFoundError441628568})
java.lang.Throwable.(Throwable.java:265)
java.lang.Error.(Error.java:70)
java.lang.LinkageError.(LinkageError.java:55)
java.lang.NoClassDefFoundError.(NoClassDefFoundError.java:59)
shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36)
shaded.net.jpountz.lz4.LZ4Factory.(LZ4Factory.java:200)
shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51)
shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => 
holding Monitor(java.lang.Class1475983836})
shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157)

shaded.net.jpountz.lz4.LZ4BlockOutputStream.(LZ4BlockOutputStream.java:135)

org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122)

org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156)

org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131)

org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120)

org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)

org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211)
[...]
```

To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to 
call `fastestInstance()` itself and cache the result (which is safe because 
these factories [are thread-safe](https://github.com/lz4/lz4-java/issues/82)).

## How was this patch tested?

Existing unit tests.

Closes #24905 from JoshRosen/lz4-factory-flags.

Lead-authored-by: Josh Rosen 
Co-authored-by: Josh Rosen 
Signed-off-by: Josh Rosen 
---
 .../org/apache/spark/io/CompressionCodec.scala | 28 +++---
 1 file changed, 25 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 065f05e..adbd59c 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,7 +22,8 @@ import java.util.Locale
 
 import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
+import net.jpountz.xxhash.XXHashFactory
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
@@ -118,14 +119,35 @@ private[spark] object CompressionCodec {
 @DeveloperApi
 class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
 
+  // SPARK-28102: if the LZ4 JNI libraries fail to initialize then 
`fastestInstance()` calls fall
+  // back to non-JNI implementations but

[spark] branch master updated: [SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes

2019-06-19 Thread joshrosen
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fc65e0f  [SPARK-27839][SQL] Change UTF8String.replace() to operate on 
UTF8 bytes
fc65e0f is described below

commit fc65e0fe2c8a114feba47d8f7b63628a676dd24c
Author: Josh Rosen 
AuthorDate: Wed Jun 19 15:21:26 2019 -0700

[SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes

## What changes were proposed in this pull request?

This PR significantly improves the performance of `UTF8String.replace()` by 
performing direct replacement over UTF8 bytes instead of decoding those bytes 
into Java Strings.

In cases where the search string is not found (i.e. no replacements are 
performed, a case which I expect to be common) this new implementation performs 
no object allocation or memory copying.

My implementation is modeled after `commons-lang3`'s 
`StringUtils.replace()` method. As part of my implementation, I needed a 
StringBuilder / resizable buffer, so I moved `UTF8StringBuilder` from the 
`catalyst` package to `unsafe`.

## How was this patch tested?

Copied tests from `StringExpressionSuite` to `UTF8StringSuite` and added a 
couple of new cases.

To evaluate performance, I did some quick local benchmarking by running the 
following code in `spark-shell` (with Java 1.8.0_191):

```scala
import org.apache.spark.unsafe.types.UTF8String

def benchmark(text: String, search: String, replace: String) {
  val utf8Text = UTF8String.fromString(text)
  val utf8Search = UTF8String.fromString(search)
  val utf8Replace = UTF8String.fromString(replace)

  val start = System.currentTimeMillis
  var i = 0
  while (i < 1000 * 1000 * 100) {
utf8Text.replace(utf8Search, utf8Replace)
i += 1
  }
  val end = System.currentTimeMillis

  println(end - start)
}

benchmark("ABCDEFGH", "DEF", "")  // replacement occurs
benchmark("ABCDEFGH", "Z", "")  // no replacement occurs
```

On my laptop this took ~54 / ~40 seconds seconds before this patch's 
changes and ~6.5 / ~3.8 seconds afterwards.

Closes #24707 from JoshRosen/faster-string-replace.

Authored-by: Josh Rosen 
Signed-off-by: Josh Rosen 
---
 .../apache/spark/unsafe}/UTF8StringBuilder.java| 27 +--
 .../org/apache/spark/unsafe/types/UTF8String.java  | 26 ---
 .../apache/spark/unsafe/types/UTF8StringSuite.java | 38 ++
 .../spark/sql/catalyst/expressions/Cast.scala  |  1 +
 .../expressions/collectionOperations.scala |  1 +
 5 files changed, 86 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
 b/common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java
similarity index 80%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
rename to 
common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java
index f0f66ba..481ea89 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.expressions.codegen;
+package org.apache.spark.unsafe;
 
-import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -34,7 +33,18 @@ public class UTF8StringBuilder {
 
   public UTF8StringBuilder() {
 // Since initial buffer size is 16 in `StringBuilder`, we set the same 
size here
-this.buffer = new byte[16];
+this(16);
+  }
+
+  public UTF8StringBuilder(int initialSize) {
+if (initialSize < 0) {
+  throw new IllegalArgumentException("Size must be non-negative");
+}
+if (initialSize > ARRAY_MAX) {
+  throw new IllegalArgumentException(
+"Size " + initialSize + " exceeded maximum size of " + ARRAY_MAX);
+}
+this.buffer = new byte[initialSize];
   }
 
   // Grows the buffer by at least `neededSize`
@@ -72,6 +82,17 @@ public class UTF8StringBuilder {
 append(UTF8String.fromString(value));
   }
 
+  public void appendBytes(Object base, long offset, int length) {
+grow(length);
+Platform.copyMemory(
+  base,
+  offset,
+  buffer,
+  cursor,
+  length);
+cursor += length;
+  }
+
   public UTF8String build() {
 return UTF8String.fromBytes(buffer, 0, totalSize());
   }
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsaf

[spark] branch master updated (630dfdf -> fe5145e)

2019-06-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 630dfdf  [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: 
`InputStreamsSuite.Modified files are correctly detected` in JDK9+
 add fe5145e  [SPARK-28109][SQL] Fix TRIM(type trimStr FROM str) returns 
incorrect value

No new revisions were added by this update.

Summary of changes:
 docs/sql-keywords.md   |  1 +
 .../apache/spark/sql/catalyst/parser/SqlBase.g4|  7 +++-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 45 ++--
 .../sql/catalyst/parser/PlanParserSuite.scala  | 37 ++---
 .../parser/TableIdentifierParserSuite.scala|  1 +
 .../sql-tests/inputs/string-functions.sql  | 16 
 .../sql-tests/results/string-functions.sql.out | 48 +++---
 7 files changed, 82 insertions(+), 73 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified files are correctly detected` in JDK9+

2019-06-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 630dfdf  [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: 
`InputStreamsSuite.Modified files are correctly detected` in JDK9+
630dfdf is described below

commit 630dfdf550690b5519c69a96d7aca041587c5359
Author: Dongjoon Hyun 
AuthorDate: Wed Jun 19 07:55:00 2019 -0700

[SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified 
files are correctly detected` in JDK9+

## What changes were proposed in this pull request?

It seems that https://bugs.openjdk.java.net/browse/JDK-8068730 makes 
`InputStreamsSuite` very flaky.

https://user-images.githubusercontent.com/9700541/59727067-017eb780-91e9-11e9-8bb0-ac5f4c1bc44d.png";>

As we can see the Jenkins result, this can be reproduced frequently with 
JDK9+.
```
$ build/sbt "streaming/testOnly *.InputStreamsSuite"
[info] - Modified files are correctly detected. *** FAILED *** (134 
milliseconds)
[info]   Set("renamed") did not equal Set() (InputStreamsSuite.scala:312)
[info]   org.scalatest.exceptions.TestFailedException:
```

The reason is the `renamed.txt`'s modification time becomes greater than 
the clock in JDK9+ and Spark ignored it with **not selected** message. In JDK8, 
 the modification time generated by this test case doesn't have `milliseconds` 
part.
```
Getting new files for time 1560896662000, ignoring files older than 
1560896659679
file:/.../streaming/subdir/renamed.txt not selected as mod time 
1560896662679 > current time 1560896662000
file:/.../streaming/subdir/existing ignored as mod time 1560896657679 <= 
ignore time 1560896659679
Finding new files took 0 ms
New files at time 1560896662000 ms:
```

## How was this patch tested?

Pass the Jenkins and manually repeat the following with JDK11 10 times.
```
$ build/sbt "streaming/testOnly *.InputStreamsSuite"
```

Closes #24904 from dongjoon-hyun/SPARK-28101.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index e81cfb5..035ed4a 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -293,8 +293,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 val textPath = new Path(generatedSubDir, "renamed.txt")
 write(textPath, "renamed\n")
 val now = clock.getTimeMillis()
-val modTime = now + durationMs / 2
-fs.setTimes(textPath, modTime, modTime)
+fs.setTimes(textPath, now, now)
 val textFilestatus = fs.getFileStatus(existingFile)
 assert(textFilestatus.getModificationTime < now + durationMs)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (9ec0496 -> 36b327d)

2019-06-19 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9ec0496  [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator 
support more metrics
 add 36b327d  [SPARK-28062][ML] Avoid unnecessary copy of coefficients 
vector in HuberAggregator

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7b7f16f -> 9ec0496)

2019-06-19 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7b7f16f  [SPARK-27890][SQL] Improve SQL parser error message for 
character-only identifier with hyphens except those in expressions
 add 9ec0496  [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator 
support more metrics

No new revisions were added by this update.

Summary of changes:
 .../MulticlassClassificationEvaluator.scala| 75 
 .../MultilabelClassificationEvaluator.scala| 19 ++---
 .../spark/mllib/evaluation/MulticlassMetrics.scala |  4 +-
 .../MulticlassClassificationEvaluatorSuite.scala   | 20 ++
 .../MultilabelClassificationEvaluatorSuite.scala   |  4 +-
 python/pyspark/ml/evaluation.py| 80 +-
 6 files changed, 157 insertions(+), 45 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org