[spark] branch master updated: [SPARK-28012][SQL] Hive UDF supports struct type foldable expression
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d9697fe [SPARK-28012][SQL] Hive UDF supports struct type foldable expression d9697fe is described below commit d9697fedf5a2fa56e25849b0715d48ac8e5345f5 Author: sychen AuthorDate: Thu Jun 20 14:36:01 2019 +0900 [SPARK-28012][SQL] Hive UDF supports struct type foldable expression ## What changes were proposed in this pull request? Currently using hive udf, the parameter is struct type, there will be an exception thrown. No handler for Hive UDF 'xxxUDF': java.lang.RuntimeException: Hive doesn't support the constant type [StructType(StructField(name,StringType,true), StructField(value,DecimalType(3,1),true))] ## How was this patch tested? added new UT Closes #24846 from cxzl25/hive_udf_literal_struct_type. Authored-by: sychen Signed-off-by: HyukjinKwon --- .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 2 ++ .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala| 11 +++ 2 files changed, 13 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 178fced..33b5bce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -847,6 +847,8 @@ private[hive] trait HiveInspectors { ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap) } +case Literal(_, dt: StructType) => + toInspector(dt) // We will enumerate all of the possible constant expressions, throw exception if we missed case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].") // ideally, we don't test the foldable here(but in optimizer), however, some of the diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 446267d..587eab4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -652,6 +652,17 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + test("SPARK-28012 Hive UDF supports struct type foldable expression") { +withUserDefinedFunction("testUDFStructType" -> false) { + // Simulate a hive udf that supports struct parameters + sql("CREATE FUNCTION testUDFStructType AS '" + +s"${classOf[GenericUDFArray].getName}'") + checkAnswer( +sql("SELECT testUDFStructType(named_struct('name', 'xx', 'value', 1))[0].value"), +Seq(Row(1))) +} + } + } class TestPair(x: Int, y: Int) extends Writable with Serializable { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4968f87 [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled 4968f87 is described below commit 4968f871685c23b83690a9a1490ed886a3417d93 Author: Yuming Wang AuthorDate: Thu Jun 20 14:19:10 2019 +0900 [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled ## What changes were proposed in this pull request? The [SPARK-27403](https://issues.apache.org/jira/browse/SPARK-27403) fixed CTAS cannot update statistics even if `spark.sql.statistics.size.autoUpdate.enabled` is enabled, as mentioned in [SPARK-23263](https://issues.apache.org/jira/browse/SPARK-23263). This pr adds tests for that fix. ## How was this patch tested? N/A Closes #20430 from wangyum/SPARK-23263. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- .../apache/spark/sql/hive/StatisticsSuite.scala| 22 ++ 1 file changed, 22 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 483bd37..7a8e257 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1431,4 +1431,26 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalogStats.rowCount.isEmpty) } } + + test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { +val tableName = "SPARK_23263" +Seq(false, true).foreach { isConverted => + Seq(false, true).foreach { updateEnabled => +withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) { + withTable(tableName) { +sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") +val catalogTable = getCatalogTable(tableName) +// Hive serde tables always update statistics by Hive metastore +if (!isConverted || updateEnabled) { + assert(catalogTable.stats.nonEmpty) +} else { + assert(catalogTable.stats.isEmpty) +} + } +} + } +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f510761 [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink f510761 is described below commit f5107614d6e38d758d9d17f2dc5d57bc9c8918a1 Author: Gengliang Wang AuthorDate: Thu Jun 20 12:57:13 2019 +0800 [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink ## What changes were proposed in this pull request? File source V1 supports reading output of FileStreamSink as batch. https://github.com/apache/spark/pull/11897 We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`. ## How was this patch tested? Unit test Closes #24900 from gengliangwang/FileStreamV2. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../sql/execution/datasources/v2/FileTable.scala | 47 +++-- .../spark/sql/streaming/FileStreamSinkSuite.scala | 189 + .../apache/spark/sql/streaming/StreamSuite.scala | 11 +- .../streaming/StreamingDeduplicationSuite.scala| 50 +++--- .../spark/sql/streaming/StreamingQuerySuite.scala | 87 +- 5 files changed, 223 insertions(+), 161 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 3b0cde5..4483f5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -20,11 +20,12 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.{DataType, StructType} @@ -44,23 +45,37 @@ abstract class FileTable( val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) -val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, - checkEmptyGlobPath = true, checkFilesExist = true) -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -new InMemoryFileIndex( - sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) +if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + // We are reading from the results of a streaming query. We will load files from + // the metadata log instead of listing them using HDFS APIs. + new MetadataLogFileIndex(sparkSession, new Path(paths.head), +options.asScala.toMap, userSpecifiedSchema) +} else { + // This is a non-streaming file based datasource. + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, +checkEmptyGlobPath = true, checkFilesExist = true) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex( +sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) +} } - lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => -val partitionSchema = fileIndex.partitionSchema -val resolver = sparkSession.sessionState.conf.resolver -StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name - }.orElse { -inferSchema(fileIndex.allFiles()) - }.getOrElse { -throw new AnalysisException( - s"Unable to infer schema for $formatName. It must be specified manually.") - }.asNullable + lazy val dataSchema: StructType = { +val schema = userSpecifiedSchema.map { schema => + val partitionSchema = fileIndex.partitionSchema + val resolver = sparkSession.sessionState.conf.resolver + StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name +}.orElse { + inferSchema(fileIndex.allFiles()) +}.getOrElse { + throw new AnalysisException( +s"Unable to infer schema for $formatName. It must be s
[spark] branch master updated: [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b276788 [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource b276788 is described below commit b276788d57b270d455ef6a7c5ed6cf8a74885dde Author: WeichenXu AuthorDate: Thu Jun 20 12:43:01 2019 +0800 [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource ## What changes were proposed in this pull request? Provide a way to recursively load data from datasource. I add a "recursiveFileLookup" option. When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively. If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown. ## How was this patch tested? Unit tests. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #24830 from WeichenXu123/recursive_ds. Authored-by: WeichenXu Signed-off-by: Wenchen Fan --- .../ml/source/image/ImageFileFormatSuite.scala | 1 + .../datasources/PartitioningAwareFileIndex.scala | 48 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 70 ++ 3 files changed, 101 insertions(+), 18 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala index 38e2513..38bb246 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala @@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { // Single column of images named "image" private lazy val imagePath = "../data/mllib/images/partitioned" + private lazy val recursiveImagePath = "../data/mllib/images" test("image datasource count test") { val df1 = spark.read.format("image").load(imagePath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3c93255..3adec2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex( pathGlobFilter.forall(_.accept(file.getPath)) } + protected lazy val recursiveFileLookup = { +parameters.getOrElse("recursiveFileLookup", "false").toBoolean + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex( val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil } else { + if (recursiveFileLookup) { +throw new IllegalArgumentException( + "Datasource with partition do not allow recursive file loading.") + } prunePartitions(partitionFilters, partitionSpec()).map { case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { @@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex( override def sizeInBytes: Long = allFiles().map(_.getLen).sum def allFiles(): Seq[FileStatus] = { -val files = if (partitionSpec().partitionColumns.isEmpty) { +val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles). @@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex( } protected def inferPartitioning(): PartitionSpec = { -// We use leaf dirs containing data files to discover the schema. -val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => - files.exists(f => isDataPath(f.getPath)) -}.keys.toSeq - -val caseInsensitiveOptions = CaseInsensitiveMap(parameters) -val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionSta
[spark] branch master updated: [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to absence of ML/MLlib classes
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ec032ce [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to absence of ML/MLlib classes ec032ce is described below commit ec032cea4f91a5ee6ce51e2216de23104486a053 Author: Josh Rosen AuthorDate: Wed Jun 19 19:06:22 2019 -0700 [SPARK-28112][TEST] Fix Kryo exception perf. bottleneck in tests due to absence of ML/MLlib classes ## What changes were proposed in this pull request? In a nutshell, it looks like the absence of ML / MLlib classes on the classpath causes code in KryoSerializer to throw and catch ClassNotFoundExceptions whenever instantiating a new serializer in newInstance(). This isn't a performance problem in production (since MLlib is on the classpath there) but it's a huge issue in tests and appears to account for an enormous amount of test time We can address this problem by reducing the total number of ClassNotFoundExceptions by performing the class existence checks once and storing the results in KryoSerializer instances rather than repeating the checks on each newInstance() call. ## How was this patch tested? The existing tests. Authored-by: Josh Rosen Closes #24916 from gatorsmile/kryoException. Lead-authored-by: Josh Rosen Co-authored-by: gatorsmile Signed-off-by: Josh Rosen --- .../apache/spark/serializer/KryoSerializer.scala | 78 +- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 3969106..20774c8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -212,40 +212,8 @@ class KryoSerializer(conf: SparkConf) // We can't load those class directly in order to avoid unnecessary jar dependencies. // We load them safely, ignore it if the class not found. -Seq( - "org.apache.spark.sql.catalyst.expressions.UnsafeRow", - "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData", - "org.apache.spark.sql.catalyst.expressions.UnsafeMapData", - - "org.apache.spark.ml.attribute.Attribute", - "org.apache.spark.ml.attribute.AttributeGroup", - "org.apache.spark.ml.attribute.BinaryAttribute", - "org.apache.spark.ml.attribute.NominalAttribute", - "org.apache.spark.ml.attribute.NumericAttribute", - - "org.apache.spark.ml.feature.Instance", - "org.apache.spark.ml.feature.LabeledPoint", - "org.apache.spark.ml.feature.OffsetInstance", - "org.apache.spark.ml.linalg.DenseMatrix", - "org.apache.spark.ml.linalg.DenseVector", - "org.apache.spark.ml.linalg.Matrix", - "org.apache.spark.ml.linalg.SparseMatrix", - "org.apache.spark.ml.linalg.SparseVector", - "org.apache.spark.ml.linalg.Vector", - "org.apache.spark.ml.stat.distribution.MultivariateGaussian", - "org.apache.spark.ml.tree.impl.TreePoint", - "org.apache.spark.mllib.clustering.VectorWithNorm", - "org.apache.spark.mllib.linalg.DenseMatrix", - "org.apache.spark.mllib.linalg.DenseVector", - "org.apache.spark.mllib.linalg.Matrix", - "org.apache.spark.mllib.linalg.SparseMatrix", - "org.apache.spark.mllib.linalg.SparseVector", - "org.apache.spark.mllib.linalg.Vector", - "org.apache.spark.mllib.regression.LabeledPoint", - "org.apache.spark.mllib.stat.distribution.MultivariateGaussian" -).foreach { name => +KryoSerializer.loadableSparkClasses.foreach { clazz => try { -val clazz = Utils.classForName(name) kryo.register(clazz) } catch { case NonFatal(_) => // do nothing @@ -516,6 +484,50 @@ private[serializer] object KryoSerializer { } } ) + + // classForName() is expensive in case the class is not found, so we filter the list of + // SQL / ML / MLlib classes once and then re-use that filtered list in newInstance() calls. + private lazy val loadableSparkClasses: Seq[Class[_]] = { +Seq( + "org.apache.spark.sql.catalyst.expressions.UnsafeRow", + "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData", + "org.apache.spark.sql.catalyst.expressions.UnsafeMapData", + + "org.apache.spark.ml.attribute.Attribute", + "org.apache.spark.ml.attribute.AttributeGroup", + "org.apache.spark.ml.attribute.BinaryAttribute", + "org.apache.spark.ml.attribute.NominalAttribute", + "org.apache.spark.ml.attribute.NumericAttribute", + + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.LabeledPoint", + "org.apache.spark.ml.feature.OffsetInstance",
[spark] branch branch-2.4 updated: [SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype checking thread safe
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ba7f61e [SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype checking thread safe ba7f61e is described below commit ba7f61e25d58aa379f94a23b03503a25574529bc Author: mwlon AuthorDate: Wed Jun 19 19:03:35 2019 -0700 [SPARK-26555][SQL][BRANCH-2.4] make ScalaReflection subtype checking thread safe This is a Spark 2.4.x backport of #24085. Original description follows below: ## What changes were proposed in this pull request? Make ScalaReflection subtype checking thread safe by adding a lock. There is a thread safety bug in the <:< operator in all versions of scala (https://github.com/scala/bug/issues/10766). ## How was this patch tested? Existing tests and a new one for the new subtype checking function. Closes #24913 from JoshRosen/joshrosen/SPARK-26555-branch-2.4-backport. Authored-by: mwlon Signed-off-by: Josh Rosen --- .../spark/sql/catalyst/ScalaReflection.scala | 216 +++-- .../spark/sql/catalyst/ScalaReflectionSuite.scala | 6 + 2 files changed, 124 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index c27180e..1b186bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -40,6 +40,9 @@ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} trait DefinedByConstructorParams +private[catalyst] object ScalaSubtypeLock + + /** * A default version of ScalaReflection that uses the runtime universe. */ @@ -68,19 +71,32 @@ object ScalaReflection extends ScalaReflection { */ def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T]) + /** + * Synchronize to prevent concurrent usage of `<:<` operator. + * This operator is not thread safe in any current version of scala; i.e. + * (2.11.12, 2.12.8, 2.13.0-M5). + * + * See https://github.com/scala/bug/issues/10766 + */ + private[catalyst] def isSubtype(tpe1: `Type`, tpe2: `Type`): Boolean = { +ScalaSubtypeLock.synchronized { + tpe1 <:< tpe2 +} + } + private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects { tpe.dealias match { - case t if t <:< definitions.NullTpe => NullType - case t if t <:< definitions.IntTpe => IntegerType - case t if t <:< definitions.LongTpe => LongType - case t if t <:< definitions.DoubleTpe => DoubleType - case t if t <:< definitions.FloatTpe => FloatType - case t if t <:< definitions.ShortTpe => ShortType - case t if t <:< definitions.ByteTpe => ByteType - case t if t <:< definitions.BooleanTpe => BooleanType - case t if t <:< localTypeOf[Array[Byte]] => BinaryType - case t if t <:< localTypeOf[CalendarInterval] => CalendarIntervalType - case t if t <:< localTypeOf[Decimal] => DecimalType.SYSTEM_DEFAULT + case t if isSubtype(t, definitions.NullTpe) => NullType + case t if isSubtype(t, definitions.IntTpe) => IntegerType + case t if isSubtype(t, definitions.LongTpe) => LongType + case t if isSubtype(t, definitions.DoubleTpe) => DoubleType + case t if isSubtype(t, definitions.FloatTpe) => FloatType + case t if isSubtype(t, definitions.ShortTpe) => ShortType + case t if isSubtype(t, definitions.ByteTpe) => ByteType + case t if isSubtype(t, definitions.BooleanTpe) => BooleanType + case t if isSubtype(t, localTypeOf[Array[Byte]]) => BinaryType + case t if isSubtype(t, localTypeOf[CalendarInterval]) => CalendarIntervalType + case t if isSubtype(t, localTypeOf[Decimal]) => DecimalType.SYSTEM_DEFAULT case _ => val className = getClassNameFromType(tpe) className match { @@ -103,13 +119,13 @@ object ScalaReflection extends ScalaReflection { */ private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects { val cls = tpe.dealias match { - case t if t <:< definitions.IntTpe => classOf[Array[Int]] - case t if t <:< definitions.LongTpe => classOf[Array[Long]] - case t if t <:< definitions.DoubleTpe => classOf[Array[Double]] - case t if t <:< definitions.FloatTpe => classOf[Array[Float]] - case t if t <:< definitions.ShortTpe => classOf[Array[Short]] - case t if t <:< definitions.ByteTpe => classOf[Array[Byte]] - case t if t <:< definitions.BooleanTpe => classOf[Array[Boolean]] + case t if isSubtype(t, definitions.IntTpe) => classOf[Array[Int]] + case t if isSubtype(t, definitions.LongTpe) => classOf
[spark] branch master updated: [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6b27ad5 [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize 6b27ad5 is described below commit 6b27ad5ea11297c39ac216054f061af334387a59 Author: Josh Rosen AuthorDate: Wed Jun 19 15:26:26 2019 -0700 [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize ## What changes were proposed in this pull request? This PR fixes a performance problem in environments where `lz4-java`'s native JNI libraries fail to initialize. Spark's uses `lz4-java` for LZ4 compression. Under the hood, the `LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call `LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls back on Java implementations in case the JNI library cannot be loaded or initialized. If the LZ4 JNI libraries are present on the library load path (`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by shading) then an exception will be thrown and caught, triggering fallback to `fastestJavaInstance()` (a non-JNI implementation). Unfortunately, the LZ4 library does not cache the fact that the JNI library failed during initialization, so every call to `LZ4Factory.fastestInstance()` re-attempts (and fails) to initialize the native code. These initialization attempts are performed in a `static synchronized` method, so exceptions from failures are thrown while holding shared monitors and this causes monitor-contention performance issues. Here's an example stack trace showing the problem: ```java java.lang.Throwable.fillInStackTrace(Native Method) java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.lang.NoClassDefFoundError441628568}) java.lang.Throwable.(Throwable.java:265) java.lang.Error.(Error.java:70) java.lang.LinkageError.(LinkageError.java:55) java.lang.NoClassDefFoundError.(NoClassDefFoundError.java:59) shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36) shaded.net.jpountz.lz4.LZ4Factory.(LZ4Factory.java:200) shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51) shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => holding Monitor(java.lang.Class1475983836}) shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157) shaded.net.jpountz.lz4.LZ4BlockOutputStream.(LZ4BlockOutputStream.java:135) org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122) org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156) org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131) org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120) org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249) org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211) [...] ``` To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to call `fastestInstance()` itself and cache the result (which is safe because these factories [are thread-safe](https://github.com/lz4/lz4-java/issues/82)). ## How was this patch tested? Existing unit tests. Closes #24905 from JoshRosen/lz4-factory-flags. Lead-authored-by: Josh Rosen Co-authored-by: Josh Rosen Signed-off-by: Josh Rosen --- .../org/apache/spark/io/CompressionCodec.scala | 28 +++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 065f05e..adbd59c 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -22,7 +22,8 @@ import java.util.Locale import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} +import net.jpountz.xxhash.XXHashFactory import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -118,14 +119,35 @@ private[spark] object CompressionCodec { @DeveloperApi class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { + // SPARK-28102: if the LZ4 JNI libraries fail to initialize then `fastestInstance()` calls fall + // back to non-JNI implementations but
[spark] branch master updated: [SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fc65e0f [SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes fc65e0f is described below commit fc65e0fe2c8a114feba47d8f7b63628a676dd24c Author: Josh Rosen AuthorDate: Wed Jun 19 15:21:26 2019 -0700 [SPARK-27839][SQL] Change UTF8String.replace() to operate on UTF8 bytes ## What changes were proposed in this pull request? This PR significantly improves the performance of `UTF8String.replace()` by performing direct replacement over UTF8 bytes instead of decoding those bytes into Java Strings. In cases where the search string is not found (i.e. no replacements are performed, a case which I expect to be common) this new implementation performs no object allocation or memory copying. My implementation is modeled after `commons-lang3`'s `StringUtils.replace()` method. As part of my implementation, I needed a StringBuilder / resizable buffer, so I moved `UTF8StringBuilder` from the `catalyst` package to `unsafe`. ## How was this patch tested? Copied tests from `StringExpressionSuite` to `UTF8StringSuite` and added a couple of new cases. To evaluate performance, I did some quick local benchmarking by running the following code in `spark-shell` (with Java 1.8.0_191): ```scala import org.apache.spark.unsafe.types.UTF8String def benchmark(text: String, search: String, replace: String) { val utf8Text = UTF8String.fromString(text) val utf8Search = UTF8String.fromString(search) val utf8Replace = UTF8String.fromString(replace) val start = System.currentTimeMillis var i = 0 while (i < 1000 * 1000 * 100) { utf8Text.replace(utf8Search, utf8Replace) i += 1 } val end = System.currentTimeMillis println(end - start) } benchmark("ABCDEFGH", "DEF", "") // replacement occurs benchmark("ABCDEFGH", "Z", "") // no replacement occurs ``` On my laptop this took ~54 / ~40 seconds seconds before this patch's changes and ~6.5 / ~3.8 seconds afterwards. Closes #24707 from JoshRosen/faster-string-replace. Authored-by: Josh Rosen Signed-off-by: Josh Rosen --- .../apache/spark/unsafe}/UTF8StringBuilder.java| 27 +-- .../org/apache/spark/unsafe/types/UTF8String.java | 26 --- .../apache/spark/unsafe/types/UTF8StringSuite.java | 38 ++ .../spark/sql/catalyst/expressions/Cast.scala | 1 + .../expressions/collectionOperations.scala | 1 + 5 files changed, 86 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java similarity index 80% rename from sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java rename to common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java index f0f66ba..481ea89 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/UTF8StringBuilder.java @@ -15,9 +15,8 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.expressions.codegen; +package org.apache.spark.unsafe; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.UTF8String; @@ -34,7 +33,18 @@ public class UTF8StringBuilder { public UTF8StringBuilder() { // Since initial buffer size is 16 in `StringBuilder`, we set the same size here -this.buffer = new byte[16]; +this(16); + } + + public UTF8StringBuilder(int initialSize) { +if (initialSize < 0) { + throw new IllegalArgumentException("Size must be non-negative"); +} +if (initialSize > ARRAY_MAX) { + throw new IllegalArgumentException( +"Size " + initialSize + " exceeded maximum size of " + ARRAY_MAX); +} +this.buffer = new byte[initialSize]; } // Grows the buffer by at least `neededSize` @@ -72,6 +82,17 @@ public class UTF8StringBuilder { append(UTF8String.fromString(value)); } + public void appendBytes(Object base, long offset, int length) { +grow(length); +Platform.copyMemory( + base, + offset, + buffer, + cursor, + length); +cursor += length; + } + public UTF8String build() { return UTF8String.fromBytes(buffer, 0, totalSize()); } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsaf
[spark] branch master updated (630dfdf -> fe5145e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 630dfdf [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified files are correctly detected` in JDK9+ add fe5145e [SPARK-28109][SQL] Fix TRIM(type trimStr FROM str) returns incorrect value No new revisions were added by this update. Summary of changes: docs/sql-keywords.md | 1 + .../apache/spark/sql/catalyst/parser/SqlBase.g4| 7 +++- .../spark/sql/catalyst/parser/AstBuilder.scala | 45 ++-- .../sql/catalyst/parser/PlanParserSuite.scala | 37 ++--- .../parser/TableIdentifierParserSuite.scala| 1 + .../sql-tests/inputs/string-functions.sql | 16 .../sql-tests/results/string-functions.sql.out | 48 +++--- 7 files changed, 82 insertions(+), 73 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified files are correctly detected` in JDK9+
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 630dfdf [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified files are correctly detected` in JDK9+ 630dfdf is described below commit 630dfdf550690b5519c69a96d7aca041587c5359 Author: Dongjoon Hyun AuthorDate: Wed Jun 19 07:55:00 2019 -0700 [SPARK-28101][DSTREAM][TEST] Fix Flaky Test: `InputStreamsSuite.Modified files are correctly detected` in JDK9+ ## What changes were proposed in this pull request? It seems that https://bugs.openjdk.java.net/browse/JDK-8068730 makes `InputStreamsSuite` very flaky. https://user-images.githubusercontent.com/9700541/59727067-017eb780-91e9-11e9-8bb0-ac5f4c1bc44d.png";> As we can see the Jenkins result, this can be reproduced frequently with JDK9+. ``` $ build/sbt "streaming/testOnly *.InputStreamsSuite" [info] - Modified files are correctly detected. *** FAILED *** (134 milliseconds) [info] Set("renamed") did not equal Set() (InputStreamsSuite.scala:312) [info] org.scalatest.exceptions.TestFailedException: ``` The reason is the `renamed.txt`'s modification time becomes greater than the clock in JDK9+ and Spark ignored it with **not selected** message. In JDK8, the modification time generated by this test case doesn't have `milliseconds` part. ``` Getting new files for time 1560896662000, ignoring files older than 1560896659679 file:/.../streaming/subdir/renamed.txt not selected as mod time 1560896662679 > current time 1560896662000 file:/.../streaming/subdir/existing ignored as mod time 1560896657679 <= ignore time 1560896659679 Finding new files took 0 ms New files at time 1560896662000 ms: ``` ## How was this patch tested? Pass the Jenkins and manually repeat the following with JDK11 10 times. ``` $ build/sbt "streaming/testOnly *.InputStreamsSuite" ``` Closes #24904 from dongjoon-hyun/SPARK-28101. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index e81cfb5..035ed4a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -293,8 +293,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val textPath = new Path(generatedSubDir, "renamed.txt") write(textPath, "renamed\n") val now = clock.getTimeMillis() -val modTime = now + durationMs / 2 -fs.setTimes(textPath, modTime, modTime) +fs.setTimes(textPath, now, now) val textFilestatus = fs.getFileStatus(existingFile) assert(textFilestatus.getModificationTime < now + durationMs) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9ec0496 -> 36b327d)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9ec0496 [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator support more metrics add 36b327d [SPARK-28062][ML] Avoid unnecessary copy of coefficients vector in HuberAggregator No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7b7f16f -> 9ec0496)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7b7f16f [SPARK-27890][SQL] Improve SQL parser error message for character-only identifier with hyphens except those in expressions add 9ec0496 [SPARK-28044][ML][PYTHON] MulticlassClassificationEvaluator support more metrics No new revisions were added by this update. Summary of changes: .../MulticlassClassificationEvaluator.scala| 75 .../MultilabelClassificationEvaluator.scala| 19 ++--- .../spark/mllib/evaluation/MulticlassMetrics.scala | 4 +- .../MulticlassClassificationEvaluatorSuite.scala | 20 ++ .../MultilabelClassificationEvaluatorSuite.scala | 4 +- python/pyspark/ml/evaluation.py| 80 +- 6 files changed, 157 insertions(+), 45 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org