This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 680c1b6 [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError 680c1b6 is described below commit 680c1b6aa4ca7e4f8abb9261b8566b6f82e05d96 Author: Parth Chandra <par...@apple.com> AuthorDate: Wed Jun 26 07:48:27 2019 +0000 [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError … prevent `StackOverflowError ` ShuffleMapTask's partition field is a FilePartition and FilePartition's 'files' field is a Stream$cons which is essentially a linked list. It is therefore serialized recursively. If the number of files in each partition is, say, 10000 files, recursing into a linked list of length 10000 overflows the stack The problem is only in Bucketed partitions. The corresponding implementation for non Bucketed partitions uses a StreamBuffer. The proposed change applies the same for Bucketed partitions. Existing unit tests. Added new unit test. The unit test fails without the patch. Manual testing on dataset used to reproduce the problem. Closes #24957 from parthchandra/branch-2.4. Authored-by: Parth Chandra <par...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 14 +++--- .../sql/execution/datasources/FileScanRDD.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../spark/sql/sources/BucketedReadSuite.scala | 50 ++++++++++++++++++++++ 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5433c30..d22fe64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -187,14 +187,14 @@ case class FileSourceScanExec( private var metadataTime = 0L - @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + @transient private lazy val selectedPartitions: Array[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 metadataTime = timeTakenMs ret - } + }.toArray override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { @@ -377,7 +377,7 @@ case class FileSourceScanExec( private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = @@ -402,7 +402,7 @@ case class FileSourceScanExec( } val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil)) + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) @@ -418,7 +418,7 @@ case class FileSourceScanExec( */ private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes @@ -449,7 +449,7 @@ case class FileSourceScanExec( partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } - }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] @@ -461,7 +461,7 @@ case class FileSourceScanExec( val newPartition = FilePartition( partitions.size, - currentFiles.toArray.toSeq) // Copy to a new Array. + currentFiles.toArray) // Copy to a new Array. partitions += newPartition } currentFiles.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index ffea33c..0756e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -57,7 +57,7 @@ case class PartitionedFile( * A collection of file blocks that should be read as a single task * (possibly from multiple partitioned directories). */ -case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition +case class FilePartition(index: Int, files: Array[PartitionedFile]) extends RDDPartition /** * An RDD that scans a list of file partitions. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index bceaf1a..e8ccc20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -279,7 +279,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } test("Locality support for FileScanRDD") { - val partition = FilePartition(0, Seq( + val partition = FilePartition(0, Array( PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0", "host1")), PartitionedFile(InternalRow.empty, "fakePath0", 10, 20, Array("host1", "host2")), PartitionedFile(InternalRow.empty, "fakePath1", 0, 5, Array("host3")), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a941420..6a4359b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -734,4 +734,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { df1.groupBy("j").agg(max("k"))) } } + + // A test with a partition where the number of files in the partition is + // large. tests for the condition where the serialization of such a task may result in a stack + // overflow if the files list is stored in a recursive data structure + // This test is ignored because it takes long to run (~3 min) + ignore("SPARK-27100 stack overflow: read data with large partitions") { + val nCount = 20000 + // reshuffle data so that many small files are created + val nShufflePartitions = 10000 + // and with one table partition, should result in 10000 files in one partition + val nPartitions = 1 + val nBuckets = 2 + val dfPartitioned = (0 until nCount) + .map(i => (i % nPartitions, i % nBuckets, i.toString)).toDF("i", "j", "k") + + // non-bucketed tables. This part succeeds without the fix for SPARK-27100 + try { + withTable("non_bucketed_table") { + dfPartitioned.repartition(nShufflePartitions) + .write + .format("parquet") + .partitionBy("i") + .saveAsTable("non_bucketed_table") + + val table = spark.table("non_bucketed_table") + val nValues = table.select("j", "k").count() + assert(nValues == nCount) + } + } catch { + case e: Exception => fail("Failed due to exception: " + e) + } + // bucketed tables. This fails without the fix for SPARK-27100 + try { + withTable("bucketed_table") { + dfPartitioned.repartition(nShufflePartitions) + .write + .format("parquet") + .partitionBy("i") + .bucketBy(nBuckets, "j") + .saveAsTable("bucketed_table") + + val table = spark.table("bucketed_table") + val nValues = table.select("j", "k").count() + assert(nValues == nCount) + } + } catch { + case e: Exception => fail("Failed due to exception: " + e) + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org