spark git commit: [SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date
Repository: spark Updated Branches: refs/heads/master f5850e789 -> 651b0277f [SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date ## What changes were proposed in this pull request? Before the patch, Spark could infer as Date a partition value which cannot be casted to Date (this can happen when there are extra characters after a valid date, like `2018-02-15AAA`). When this happens and the input format has metadata which define the schema of the table, then `null` is returned as a value for the partition column, because the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value. The PR checks in the partition inference that values can be casted to Date and Timestamp, in order to infer that datatype to them. ## How was this patch tested? added UT Author: Marco Gaido Closes #20621 from mgaido91/SPARK-23436. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/651b0277 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/651b0277 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/651b0277 Branch: refs/heads/master Commit: 651b0277fe989119932d5ae1ef729c9768aa018d Parents: f5850e7 Author: Marco Gaido Authored: Tue Feb 20 13:56:38 2018 +0800 Committer: Wenchen Fan Committed: Tue Feb 20 13:56:38 2018 +0800 -- .../datasources/PartitioningUtils.scala | 40 +++- .../ParquetPartitionDiscoverySuite.scala| 14 +++ 2 files changed, 44 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 472bf82..379acb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -407,6 +407,34 @@ object PartitioningUtils { Literal(bigDecimal) } +val dateTry = Try { + // try and parse the date, if no exception occurs this is a candidate to be resolved as + // DateType + DateTimeUtils.getThreadLocalDateFormat.parse(raw) + // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. + // This can happen since DateFormat.parse may not use the entire text of the given string: + // so if there are extra-characters after the date, it returns correctly. + // We need to check that we can cast the raw string since we later can use Cast to get + // the partition values with the right DataType (see + // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning) + val dateValue = Cast(Literal(raw), DateType).eval() + // Disallow DateType if the cast returned null + require(dateValue != null) + Literal.create(dateValue, DateType) +} + +val timestampTry = Try { + val unescapedRaw = unescapePathName(raw) + // try and parse the date, if no exception occurs this is a candidate to be resolved as + // TimestampType + DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw) + // SPARK-23436: see comment for date + val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval() + // Disallow TimestampType if the cast returned null + require(timestampValue != null) + Literal.create(timestampValue, TimestampType) +} + if (typeInference) { // First tries integral types Try(Literal.create(Integer.parseInt(raw), IntegerType)) @@ -415,16 +443,8 @@ object PartitioningUtils { // Then falls back to fractional types .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) // Then falls back to date/timestamp types -.orElse(Try( - Literal.create( -DateTimeUtils.getThreadLocalTimestampFormat(timeZone) - .parse(unescapePathName(raw)).getTime * 1000L, -TimestampType))) -.orElse(Try( - Literal.create( -DateTimeUtils.millisToDays( - DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime), -DateType))) +.orElse(timestampTry) +.orElse(dateTry) // Then falls back to string .getOrElse { if (raw == DEFAULT_PARTITION_NAME) { http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/test/scala/org/apache/spark/sql/execution/d
spark git commit: [SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat
Repository: spark Updated Branches: refs/heads/master 3ee3b2ae1 -> f5850e789 [SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat ## What changes were proposed in this pull request? ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization. - [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) - [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) ``` Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36) at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:538) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400) at ``` ## How was this patch tested? Manual. The following test case generates the same leakage. ```scala test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString) val df = spark.read.parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } ``` Author: Dongjoon Hyun Closes #20619 from dongjoon-hyun/SPARK-23390. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5850e78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5850e78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5850e78 Branch: refs/heads/master Commit: f5850e78924d03448ad243cdd32b24c3fe0ea8af Parents: 3ee3b2a Author: Dongjoon Hyun Authored: Tue Feb 20 13:33:03 2018 +0800 Committer: Wenchen Fan Committed: Tue Feb 20 13:33:03 2018 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 22 +--- 1 file changed, 10 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5850e78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ba69f9a..476bd02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -395,16 +395,21 @@ class ParquetFileFormat ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) - val parquetReader = if (enableVectorizedReader) { + if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) +val iter = new RecordReaderIterator(vectorizedReader) +// SPARK-23457 Register a task completion lister before `initialization`. +taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.
svn commit: r25154 - /dev/spark/v2.3.0-rc4-bin/spark-parent_2.11.iml
Author: sameerag Date: Tue Feb 20 04:47:53 2018 New Revision: 25154 Log: remove iml file Removed: dev/spark/v2.3.0-rc4-bin/spark-parent_2.11.iml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org