This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 680c1b6  [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in 
`FilePartition` to prevent StackOverflowError
680c1b6 is described below

commit 680c1b6aa4ca7e4f8abb9261b8566b6f82e05d96
Author: Parth Chandra <par...@apple.com>
AuthorDate: Wed Jun 26 07:48:27 2019 +0000

    [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to 
prevent StackOverflowError
    
    … prevent `StackOverflowError `
    
    ShuffleMapTask's partition field is a FilePartition and FilePartition's 
'files' field is a Stream$cons which is essentially a linked list. It is 
therefore serialized recursively.
    If the number of files in each partition is, say, 10000 files, recursing 
into a linked list of length 10000 overflows the stack
    
    The problem is only in Bucketed partitions. The corresponding 
implementation for non Bucketed partitions uses a StreamBuffer. The proposed 
change applies the same for Bucketed partitions.
    
    Existing unit tests. Added new unit test. The unit test fails without the 
patch. Manual testing on dataset used to reproduce the problem.
    
    Closes #24957 from parthchandra/branch-2.4.
    
    Authored-by: Parth Chandra <par...@apple.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 14 +++---
 .../sql/execution/datasources/FileScanRDD.scala    |  2 +-
 .../datasources/FileSourceStrategySuite.scala      |  2 +-
 .../spark/sql/sources/BucketedReadSuite.scala      | 50 ++++++++++++++++++++++
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5433c30..d22fe64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -187,14 +187,14 @@ case class FileSourceScanExec(
 
   private var metadataTime = 0L
 
-  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+  @transient private lazy val selectedPartitions: Array[PartitionDirectory] = {
     val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
     val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
     metadataTime = timeTakenMs
     ret
-  }
+  }.toArray
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
     val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -377,7 +377,7 @@ case class FileSourceScanExec(
   private def createBucketedReadRDD(
       bucketSpec: BucketSpec,
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[PartitionDirectory],
+      selectedPartitions: Array[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
@@ -402,7 +402,7 @@ case class FileSourceScanExec(
     }
 
     val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
-      FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, 
Nil))
+      FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, 
Array.empty))
     }
 
     new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
@@ -418,7 +418,7 @@ case class FileSourceScanExec(
    */
   private def createNonBucketedReadRDD(
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[PartitionDirectory],
+      selectedPartitions: Array[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     val defaultMaxSplitBytes =
       fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
@@ -449,7 +449,7 @@ case class FileSourceScanExec(
             partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
         }
       }
-    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
 
     val partitions = new ArrayBuffer[FilePartition]
     val currentFiles = new ArrayBuffer[PartitionedFile]
@@ -461,7 +461,7 @@ case class FileSourceScanExec(
         val newPartition =
           FilePartition(
             partitions.size,
-            currentFiles.toArray.toSeq) // Copy to a new Array.
+            currentFiles.toArray) // Copy to a new Array.
         partitions += newPartition
       }
       currentFiles.clear()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ffea33c..0756e7e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -57,7 +57,7 @@ case class PartitionedFile(
  * A collection of file blocks that should be read as a single task
  * (possibly from multiple partitioned directories).
  */
-case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends 
RDDPartition
+case class FilePartition(index: Int, files: Array[PartitionedFile]) extends 
RDDPartition
 
 /**
  * An RDD that scans a list of file partitions.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bceaf1a..e8ccc20 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -279,7 +279,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
   }
 
   test("Locality support for FileScanRDD") {
-    val partition = FilePartition(0, Seq(
+    val partition = FilePartition(0, Array(
       PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0", 
"host1")),
       PartitionedFile(InternalRow.empty, "fakePath0", 10, 20, Array("host1", 
"host2")),
       PartitionedFile(InternalRow.empty, "fakePath1", 0, 5, Array("host3")),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index a941420..6a4359b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -734,4 +734,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         df1.groupBy("j").agg(max("k")))
     }
   }
+
+  //  A test with a partition where the number of files in the partition is
+  //  large. tests for the condition where the serialization of such a task 
may result in a stack
+  //  overflow if the files list is stored in a recursive data structure
+  //  This test is ignored because it takes long to run (~3 min)
+  ignore("SPARK-27100 stack overflow: read data with large partitions") {
+    val nCount = 20000
+    // reshuffle data so that many small files are created
+    val nShufflePartitions = 10000
+    // and with one table partition, should result in 10000 files in one 
partition
+    val nPartitions = 1
+    val nBuckets = 2
+    val dfPartitioned = (0 until nCount)
+      .map(i => (i % nPartitions, i % nBuckets, i.toString)).toDF("i", "j", 
"k")
+
+    // non-bucketed tables. This part succeeds without the fix for SPARK-27100
+    try {
+      withTable("non_bucketed_table") {
+        dfPartitioned.repartition(nShufflePartitions)
+          .write
+          .format("parquet")
+          .partitionBy("i")
+          .saveAsTable("non_bucketed_table")
+
+        val table = spark.table("non_bucketed_table")
+        val nValues = table.select("j", "k").count()
+        assert(nValues == nCount)
+      }
+    } catch {
+      case e: Exception => fail("Failed due to exception: " + e)
+    }
+    // bucketed tables. This fails without the fix for SPARK-27100
+    try {
+      withTable("bucketed_table") {
+        dfPartitioned.repartition(nShufflePartitions)
+          .write
+          .format("parquet")
+          .partitionBy("i")
+          .bucketBy(nBuckets, "j")
+          .saveAsTable("bucketed_table")
+
+        val table = spark.table("bucketed_table")
+        val nValues = table.select("j", "k").count()
+        assert(nValues == nCount)
+      }
+    } catch {
+      case e: Exception => fail("Failed due to exception: " + e)
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to