spark git commit: [SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date

2018-02-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master f5850e789 -> 651b0277f


[SPARK-23436][SQL] Infer partition as Date only if it can be casted to Date

## What changes were proposed in this pull request?

Before the patch, Spark could infer as Date a partition value which cannot be 
casted to Date (this can happen when there are extra characters after a valid 
date, like `2018-02-15AAA`).

When this happens and the input format has metadata which define the schema of 
the table, then `null` is returned as a value for the partition column, because 
the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is 
unable to convert the value.

The PR checks in the partition inference that values can be casted to Date and 
Timestamp, in order to infer that datatype to them.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #20621 from mgaido91/SPARK-23436.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/651b0277
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/651b0277
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/651b0277

Branch: refs/heads/master
Commit: 651b0277fe989119932d5ae1ef729c9768aa018d
Parents: f5850e7
Author: Marco Gaido 
Authored: Tue Feb 20 13:56:38 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Feb 20 13:56:38 2018 +0800

--
 .../datasources/PartitioningUtils.scala | 40 +++-
 .../ParquetPartitionDiscoverySuite.scala| 14 +++
 2 files changed, 44 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 472bf82..379acb6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -407,6 +407,34 @@ object PartitioningUtils {
   Literal(bigDecimal)
 }
 
+val dateTry = Try {
+  // try and parse the date, if no exception occurs this is a candidate to 
be resolved as
+  // DateType
+  DateTimeUtils.getThreadLocalDateFormat.parse(raw)
+  // SPARK-23436: Casting the string to date may still return null if a 
bad Date is provided.
+  // This can happen since DateFormat.parse  may not use the entire text 
of the given string:
+  // so if there are extra-characters after the date, it returns correctly.
+  // We need to check that we can cast the raw string since we later can 
use Cast to get
+  // the partition values with the right DataType (see
+  // 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
+  val dateValue = Cast(Literal(raw), DateType).eval()
+  // Disallow DateType if the cast returned null
+  require(dateValue != null)
+  Literal.create(dateValue, DateType)
+}
+
+val timestampTry = Try {
+  val unescapedRaw = unescapePathName(raw)
+  // try and parse the date, if no exception occurs this is a candidate to 
be resolved as
+  // TimestampType
+  DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw)
+  // SPARK-23436: see comment for date
+  val timestampValue = Cast(Literal(unescapedRaw), TimestampType, 
Some(timeZone.getID)).eval()
+  // Disallow TimestampType if the cast returned null
+  require(timestampValue != null)
+  Literal.create(timestampValue, TimestampType)
+}
+
 if (typeInference) {
   // First tries integral types
   Try(Literal.create(Integer.parseInt(raw), IntegerType))
@@ -415,16 +443,8 @@ object PartitioningUtils {
 // Then falls back to fractional types
 .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
 // Then falls back to date/timestamp types
-.orElse(Try(
-  Literal.create(
-DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
-  .parse(unescapePathName(raw)).getTime * 1000L,
-TimestampType)))
-.orElse(Try(
-  Literal.create(
-DateTimeUtils.millisToDays(
-  DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
-DateType)))
+.orElse(timestampTry)
+.orElse(dateTry)
 // Then falls back to string
 .getOrElse {
   if (raw == DEFAULT_PARTITION_NAME) {

http://git-wip-us.apache.org/repos/asf/spark/blob/651b0277/sql/core/src/test/scala/org/apache/spark/sql/execution/d

spark git commit: [SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat

2018-02-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 3ee3b2ae1 -> f5850e789


[SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat

## What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by 
registering task completion listers first before initialization.

- 
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- 
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:538)
at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
at
```

## How was this patch tested?

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in 
ParquetFileFormat") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
  withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, 
"first").toString)
Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, 
"second").toString)
val df = spark.read.parquet(
  new Path(basePath, "first").toString,
  new Path(basePath, "second").toString)
val e = intercept[SparkException] {
  df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
  }
}
  }
```

Author: Dongjoon Hyun 

Closes #20619 from dongjoon-hyun/SPARK-23390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5850e78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5850e78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5850e78

Branch: refs/heads/master
Commit: f5850e78924d03448ad243cdd32b24c3fe0ea8af
Parents: 3ee3b2a
Author: Dongjoon Hyun 
Authored: Tue Feb 20 13:33:03 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Feb 20 13:33:03 2018 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala | 22 +---
 1 file changed, 10 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5850e78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index ba69f9a..476bd02 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -395,16 +395,21 @@ class ParquetFileFormat
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
   val taskContext = Option(TaskContext.get())
-  val parquetReader = if (enableVectorizedReader) {
+  if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader(
   convertTz.orNull, enableOffHeapColumnVector && 
taskContext.isDefined, capacity)
+val iter = new RecordReaderIterator(vectorizedReader)
+// SPARK-23457 Register a task completion lister before 
`initialization`.
+taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
 vectorizedReader.initialize(split, hadoopAttemptContext)
 logDebug(s"Appending $partitionSchema ${file.partitionValues}")
 vectorizedReader.initBatch(partitionSchema, file.

svn commit: r25154 - /dev/spark/v2.3.0-rc4-bin/spark-parent_2.11.iml

2018-02-19 Thread sameerag
Author: sameerag
Date: Tue Feb 20 04:47:53 2018
New Revision: 25154

Log:
remove iml file

Removed:
dev/spark/v2.3.0-rc4-bin/spark-parent_2.11.iml


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org