If you don't hesitate the newest version, you try to use v2.0-preview. http://spark.apache.org/news/spark-2.0.0-preview.html
There, you can control #partitions for input partitions without shuffles by two parameters below; spark.sql.files.maxPartitionBytes spark.sql.files.openCostInBytes ( Not documented though, .... // maropu On Tue, May 31, 2016 at 11:08 PM, Maciej Sokołowski <matemac...@gmail.com> wrote: > After setting shuffle to true I get expected 128 partitions, but I'm > worried about performance of such solution - especially I see that some > shuffling is done because size of partitions chages: > > scala> sc.textFile("hdfs:///proj/dFAB_test/testdata/perf_test1.csv", > minPartitions=128).coalesce(128, true).mapPartitions{rows => > Iterator(rows.length)}.collect() > res3: Array[Int] = Array(768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 828, 896, 896, 896, 896, 896, 896, > 896, 896, 896, 896, 896, 896, 850, 786, 768, 768, 768, 768, 768, 768, 768, > 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768) > > I use spark 1.6.0 > > > On 31 May 2016 at 16:02, Ted Yu <yuzhih...@gmail.com> wrote: > >> Value for shuffle is false by default. >> >> Have you tried setting it to true ? >> >> Which Spark release are you using ? >> >> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemac...@gmail.com> >> wrote: >> >>> Hello Spark users and developers. >>> >>> I read file and want to ensure that it has exact number of partitions, >>> for example 128. >>> >>> In documentation I found: >>> >>> def textFile(path: String, minPartitions: Int = defaultMinPartitions): >>> RDD[String] >>> >>> But argument here is minimal number of partitions, so I use coalesce to >>> ensure desired number of partitions: >>> >>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: >>> Ordering[T] = null): RDD[T] >>> //Return a new RDD that is reduced into numPartitions partitions. >>> >>> So I combine them and get number of partitions lower than expected: >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).coalesce(128).getNumPartitions >>> res14: Int = 126 >>> >>> Is this expected behaviour? File contains 100000 lines, size of >>> partitions before and after coalesce: >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect() >>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781, >>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, >>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, >>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, >>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, >>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781) >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).coalesce(128).mapPartitions{rows => >>> Iterator(rows.length)}.collect() >>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781, >>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781, >>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781, >>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, >>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782) >>> >>> So two partitions are double the size. Is this expected behaviour or is >>> it some kind of bug? >>> >>> Thanks, >>> Maciej Sokołowski >>> >> >> > -- --- Takeshi Yamamuro