Repository: spark Updated Branches: refs/heads/master 103854028 -> bbbf81469
[SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter ## What changes were proposed in this pull request? Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters. ## How was this patch tested? I have not provided the additional test since the fix is very straightforward. Closes #21638 from bomeng/22357. Lead-authored-by: Bo Meng <men...@hotmail.com> Co-authored-by: Bo Meng <bo.m...@jd.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbbf8146 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbbf8146 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbbf8146 Branch: refs/heads/master Commit: bbbf8146916aa70d9774543643776eed9d9d9373 Parents: 1038540 Author: Bo Meng <men...@hotmail.com> Authored: Tue Aug 28 19:39:13 2018 -0500 Committer: Sean Owen <sean.o...@databricks.com> Committed: Tue Aug 28 19:39:13 2018 -0500 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/input/PortableDataStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bbbf8146/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 17cdba4..ab020aa 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T] def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) - val defaultParallelism = sc.defaultParallelism + val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions) val files = listStatus(context).asScala val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org