Repository: spark
Updated Branches:
  refs/heads/master 3ee3b2ae1 -> f5850e789


[SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat

## What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by 
registering task completion listers first before initialization.

- 
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- 
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
        at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
        at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
        at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
        at
```

## How was this patch tested?

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in 
ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, 
"first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, 
"second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #20619 from dongjoon-hyun/SPARK-23390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5850e78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5850e78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5850e78

Branch: refs/heads/master
Commit: f5850e78924d03448ad243cdd32b24c3fe0ea8af
Parents: 3ee3b2a
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue Feb 20 13:33:03 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Feb 20 13:33:03 2018 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala | 22 +++++++++-----------
 1 file changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5850e78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index ba69f9a..476bd02 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -395,16 +395,21 @@ class ParquetFileFormat
         
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
       }
       val taskContext = Option(TaskContext.get())
-      val parquetReader = if (enableVectorizedReader) {
+      if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
           convertTz.orNull, enableOffHeapColumnVector && 
taskContext.isDefined, capacity)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion lister before 
`initialization`.
+        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
         if (returningBatch) {
           vectorizedReader.enableReturningBatches()
         }
-        vectorizedReader
+
+        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
@@ -414,18 +419,11 @@ class ParquetFileFormat
         } else {
           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
         }
+        val iter = new RecordReaderIterator(reader)
+        // SPARK-23457 Register a task completion lister before 
`initialization`.
+        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
         reader.initialize(split, hadoopAttemptContext)
-        reader
-      }
 
-      val iter = new RecordReaderIterator(parquetReader)
-      taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
-
-      // UnsafeRowParquetRecordReader appends the columns internally to avoid 
another copy.
-      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
-          enableVectorizedReader) {
-        iter.asInstanceOf[Iterator[InternalRow]]
-      } else {
         val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
         val joinedRow = new JoinedRow()
         val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to