[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r230579427 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { --- End diff -- Ok,thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r230579423 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum -val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong +val minPartNum = Math.max(sc.defaultParallelism, minPartitions) +val maxSplitSize = Math.ceil(totalLen * 1.0 / minPartNum).toLong --- End diff -- Thanks, I have change the description. I think the number of partitions belong to input format, and it can also be used by other RDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user 10110346 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r230579084 --- Diff: core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala --- @@ -51,7 +51,7 @@ private[spark] class WholeTextFileRDD( case _ => } val jobContext = new JobContextImpl(conf, jobId) -inputFormat.setMinPartitions(jobContext, minPartitions) +inputFormat.setMinPartitions(sc, jobContext, minPartitions) --- End diff -- Yeah, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229720799 --- Diff: core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala --- @@ -51,7 +51,7 @@ private[spark] class WholeTextFileRDD( case _ => } val jobContext = new JobContextImpl(conf, jobId) -inputFormat.setMinPartitions(jobContext, minPartitions) +inputFormat.setMinPartitions(sc, jobContext, minPartitions) --- End diff -- You don't need to pass the context. You just need to pass the one value you're going to use from it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229721018 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum -val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong +val minPartNum = Math.max(sc.defaultParallelism, minPartitions) +val maxSplitSize = Math.ceil(totalLen * 1.0 / minPartNum).toLong --- End diff -- Yes, this no longer matches the title or JIRA. I am also not clear on the argument why this is better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229717747 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { --- End diff -- Please update the above comment to explain the new behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229717581 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum -val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong +val minPartNum = Math.max(sc.defaultParallelism, minPartitions) --- End diff -- This is potentially a behavior change. cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
GitHub user 10110346 opened a pull request: https://github.com/apache/spark/pull/22723 [SPARK-25729][CORE]It is better to replace `minPartitions` with `defaultParallelism` , when `minPartitions` is less than `defaultParallelism` ## What changes were proposed in this pull request? In âWholeTextFileRDDâï¼when `minPartitions` is less than `defaultParallelism`ï¼ it is better to replace `minPartitions` with `defaultParallelism` , because this can make better use of resources and improve concurrency. ## How was this patch tested? Added a unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/10110346/spark minPartNum Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22723.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22723 commit b80bf66a8109faa7f58d45b92417a981666866a0 Author: liuxian Date: 2018-10-15T06:39:17Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org