Repository: spark
Updated Branches:
  refs/heads/branch-2.1 4cb4e5ff0 -> c8879bf1e


[SPARK-16575][CORE] partition calculation mismatch with sc.binaryFiles

## What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This 
change rectifies the issue with BinaryFileRDD partition calculations as  upon 
creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of 
two partitions only.
## How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two 
partitions) was first replicated and then tested upon the changes. Also the 
unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project 
under the project's open source license

srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a 
look .

Author: fidato <fidato.jul...@gmail.com>

Closes #15327 from fidato13/SPARK-16575.

(cherry picked from commit 6f3697136aa68dc39d3ce42f43a7af554d2a3bf9)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8879bf1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8879bf1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8879bf1

Branch: refs/heads/branch-2.1
Commit: c8879bf1ee2af9ccd5d5656571d931d2fc1da024
Parents: 4cb4e5f
Author: fidato <fidato.jul...@gmail.com>
Authored: Mon Nov 7 18:41:17 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Nov 7 18:41:29 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/input/PortableDataStream.scala | 14 +++++++++++---
 .../org/apache/spark/internal/config/package.scala  | 13 +++++++++++++
 .../scala/org/apache/spark/rdd/BinaryFileRDD.scala  |  4 ++--
 docs/configuration.md                               | 16 ++++++++++++++++
 4 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8879bf1/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index f66510b..59404e0 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, 
TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, 
CombineFileRecordReader, CombineFileSplit}
 
+import org.apache.spark.internal.config
+import org.apache.spark.SparkContext
+
 /**
  * A general format for reading whole files in as streams, byte arrays,
  * or other functions to be added
@@ -40,9 +43,14 @@ private[spark] abstract class StreamFileInputFormat[T]
    * Allow minPartitions set by end-user in order to keep compatibility with 
old Hadoop API
    * which is set through setMaxSplitSize
    */
-  def setMinPartitions(context: JobContext, minPartitions: Int) {
-    val totalLen = 
listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
-    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
1.0)).toLong
+  def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
Int) {
+    val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
+    val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
+    val defaultParallelism = sc.defaultParallelism
+    val files = listStatus(context).asScala
+    val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + 
openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+    val maxSplitSize = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
     super.setMaxSplitSize(maxSplitSize)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c8879bf1/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 497ca92..4a3e3d5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -206,4 +206,17 @@ package object config {
       "encountering corrupt files and contents that have been read will still 
be returned.")
     .booleanConf
     .createWithDefault(false)
+
+  private[spark] val FILES_MAX_PARTITION_BYTES = 
ConfigBuilder("spark.files.maxPartitionBytes")
+    .doc("The maximum number of bytes to pack into a single partition when 
reading files.")
+    .longConf
+    .createWithDefault(128 * 1024 * 1024)
+
+  private[spark] val FILES_OPEN_COST_IN_BYTES = 
ConfigBuilder("spark.files.openCostInBytes")
+    .doc("The estimated cost to open a file, measured by the number of bytes 
could be scanned in" +
+      " the same time. This is used when putting multiple files into a 
partition. It's better to" +
+      " over estimate, then the partitions with small files will be faster 
than partitions with" +
+      " bigger files.")
+    .longConf
+    .createWithDefault(4 * 1024 * 1024)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c8879bf1/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 41832e8..50d977a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{Partition, SparkContext}
 import org.apache.spark.input.StreamFileInputFormat
 
 private[spark] class BinaryFileRDD[T](
-    sc: SparkContext,
+    @transient private val sc: SparkContext,
     inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
     keyClass: Class[String],
     valueClass: Class[T],
@@ -43,7 +43,7 @@ private[spark] class BinaryFileRDD[T](
       case _ =>
     }
     val jobContext = new JobContextImpl(conf, jobId)
-    inputFormat.setMinPartitions(jobContext, minPartitions)
+    inputFormat.setMinPartitions(sc, jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c8879bf1/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0017219..d0acd94 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1035,6 +1035,22 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.files.maxPartitionBytes</code></td>
+  <td>134217728 (128 MB)</td>
+  <td>
+    The maximum number of bytes to pack into a single partition when reading 
files.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.files.openCostInBytes</code></td>
+  <td>4194304 (4 MB)</td>
+  <td>
+    The estimated cost to open a file, measured by the number of bytes could 
be scanned in the same
+    time. This is used when putting multiple files into a partition. It is 
better to over estimate,
+    then the partitions with small files will be faster than partitions with 
bigger files.
+  </td>
+</tr>
+<tr>
     <td><code>spark.hadoop.cloneConf</code></td>
     <td>false</td>
     <td>If set to true, clones a new Hadoop <code>Configuration</code> object 
for each task.  This


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to