Repository: spark Updated Branches: refs/heads/master fc64e83f9 -> 81a305dd0
[SPARK-25753][CORE] fix reading small files via BinaryFileRDD ## What changes were proposed in this pull request? This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ## How was this patch tested? Added a unit test Closes #22725 from 10110346/maxSplitSize_node_rack. Authored-by: liuxian <liu.xi...@zte.com.cn> Signed-off-by: Thomas Graves <tgra...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81a305dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81a305dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81a305dd Branch: refs/heads/master Commit: 81a305dd0418f6e0136b4e38ffe91e0b76c8806e Parents: fc64e83 Author: liuxian <liu.xi...@zte.com.cn> Authored: Mon Oct 22 08:53:18 2018 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Mon Oct 22 08:53:18 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/input/PortableDataStream.scala | 12 ++++++++++++ core/src/test/scala/org/apache/spark/FileSuite.scala | 13 +++++++++++++ 2 files changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index ab020aa..5b33c11 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -52,6 +52,18 @@ private[spark] abstract class StreamFileInputFormat[T] val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + + // For small files we need to ensure the min split size per node & rack <= maxSplitSize + val jobConfig = context.getConfiguration + val minSplitSizePerNode = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) + val minSplitSizePerRack = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + + if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) + } + if (maxSplitSize < minSplitSizePerRack) { + super.setMinSplitSizeRack(maxSplitSize) + } super.setMaxSplitSize(maxSplitSize) } http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 81b18c7..34efcdf 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -320,6 +320,19 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } + test("minimum split size per node and per rack should be less than or equal to maxSplitSize") { + sc = new SparkContext("local", "test") + val testOutput = Array[Byte](1, 2, 3, 4, 5) + val outFile = writeBinaryData(testOutput, 1) + sc.hadoopConfiguration.setLong( + "mapreduce.input.fileinputformat.split.minsize.per.node", 5123456) + sc.hadoopConfiguration.setLong( + "mapreduce.input.fileinputformat.split.minsize.per.rack", 5123456) + + val (_, data) = sc.binaryFiles(outFile.getAbsolutePath).collect().head + assert(data.toArray === testOutput) + } + test("fixed record length binary file as byte array") { sc = new SparkContext("local", "test") val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org