Repository: spark
Updated Branches:
  refs/heads/master fc64e83f9 -> 81a305dd0


[SPARK-25753][CORE] fix reading small files via BinaryFileRDD

## What changes were proposed in this pull request?

This is a follow up of #21601, `StreamFileInputFormat` and 
`WholeTextFileInputFormat` have the same problem.

`Minimum split size pernode 5123456 cannot be larger than maximum split size 
4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than 
maximum split size 4194304
        at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:
 201)
        at 
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`

## How was this patch tested?
Added a unit test

Closes #22725 from 10110346/maxSplitSize_node_rack.

Authored-by: liuxian <liu.xi...@zte.com.cn>
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81a305dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81a305dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81a305dd

Branch: refs/heads/master
Commit: 81a305dd0418f6e0136b4e38ffe91e0b76c8806e
Parents: fc64e83
Author: liuxian <liu.xi...@zte.com.cn>
Authored: Mon Oct 22 08:53:18 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Mon Oct 22 08:53:18 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/input/PortableDataStream.scala    | 12 ++++++++++++
 core/src/test/scala/org/apache/spark/FileSuite.scala   | 13 +++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index ab020aa..5b33c11 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -52,6 +52,18 @@ private[spark] abstract class StreamFileInputFormat[T]
     val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + 
openCostInBytes).sum
     val bytesPerCore = totalBytes / defaultParallelism
     val maxSplitSize = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+
+    // For small files we need to ensure the min split size per node & rack <= 
maxSplitSize
+    val jobConfig = context.getConfiguration
+    val minSplitSizePerNode = 
jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+    val minSplitSizePerRack = 
jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+    if (maxSplitSize < minSplitSizePerNode) {
+      super.setMinSplitSizeNode(maxSplitSize)
+    }
+    if (maxSplitSize < minSplitSizePerRack) {
+      super.setMinSplitSizeRack(maxSplitSize)
+    }
     super.setMaxSplitSize(maxSplitSize)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 81b18c7..34efcdf 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -320,6 +320,19 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("minimum split size per node and per rack should be less than or equal 
to maxSplitSize") {
+    sc = new SparkContext("local", "test")
+    val testOutput = Array[Byte](1, 2, 3, 4, 5)
+    val outFile = writeBinaryData(testOutput, 1)
+    sc.hadoopConfiguration.setLong(
+      "mapreduce.input.fileinputformat.split.minsize.per.node", 5123456)
+    sc.hadoopConfiguration.setLong(
+      "mapreduce.input.fileinputformat.split.minsize.per.rack", 5123456)
+
+    val (_, data) = sc.binaryFiles(outFile.getAbsolutePath).collect().head
+    assert(data.toArray === testOutput)
+  }
+
   test("fixed record length binary file as byte array") {
     sc = new SparkContext("local", "test")
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to