Repository: spark
Updated Branches:
  refs/heads/master 103854028 -> bbbf81469


[SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter

## What changes were proposed in this pull request?
Fix the issue that minPartitions was not used in the method. This is a simple 
fix and I am not trying to make it complicated. The purpose is to still allow 
user to control the defaultParallelism through the value of minPartitions, 
while also via sc.defaultParallelism parameters.

## How was this patch tested?
I have not provided the additional test since the fix is very straightforward.

Closes #21638 from bomeng/22357.

Lead-authored-by: Bo Meng <men...@hotmail.com>
Co-authored-by: Bo Meng <bo.m...@jd.com>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbbf8146
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbbf8146
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbbf8146

Branch: refs/heads/master
Commit: bbbf8146916aa70d9774543643776eed9d9d9373
Parents: 1038540
Author: Bo Meng <men...@hotmail.com>
Authored: Tue Aug 28 19:39:13 2018 -0500
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Tue Aug 28 19:39:13 2018 -0500

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/input/PortableDataStream.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bbbf8146/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 17cdba4..ab020aa 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T]
   def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
Int) {
     val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
     val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
-    val defaultParallelism = sc.defaultParallelism
+    val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
     val files = listStatus(context).asScala
     val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + 
openCostInBytes).sum
     val bytesPerCore = totalBytes / defaultParallelism


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to