If you don't hesitate the newest version, you try to use v2.0-preview.
http://spark.apache.org/news/spark-2.0.0-preview.html

There, you can control #partitions for input partitions without shuffles by
two parameters below;
spark.sql.files.maxPartitionBytes
spark.sql.files.openCostInBytes
( Not documented though, ....

// maropu

On Tue, May 31, 2016 at 11:08 PM, Maciej Sokołowski <matemac...@gmail.com>
wrote:

> After setting shuffle to true I get expected 128 partitions, but I'm
> worried about performance of such solution - especially I see that some
> shuffling is done because size of partitions chages:
>
> scala> sc.textFile("hdfs:///proj/dFAB_test/testdata/perf_test1.csv",
> minPartitions=128).coalesce(128, true).mapPartitions{rows =>
> Iterator(rows.length)}.collect()
> res3: Array[Int] = Array(768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 828, 896, 896, 896, 896, 896, 896,
> 896, 896, 896, 896, 896, 896, 850, 786, 768, 768, 768, 768, 768, 768, 768,
> 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768, 768)
>
> I use spark 1.6.0
>
>
> On 31 May 2016 at 16:02, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Value for shuffle is false by default.
>>
>> Have you tried setting it to true ?
>>
>> Which Spark release are you using ?
>>
>> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemac...@gmail.com>
>> wrote:
>>
>>> Hello Spark users and developers.
>>>
>>> I read file and want to ensure that it has exact number of partitions,
>>> for example 128.
>>>
>>> In documentation I found:
>>>
>>> def textFile(path: String, minPartitions: Int = defaultMinPartitions):
>>> RDD[String]
>>>
>>> But argument here is minimal number of partitions, so I use coalesce to
>>> ensure desired number of partitions:
>>>
>>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
>>> Ordering[T] = null): RDD[T]
>>> //Return a new RDD that is reduced into numPartitions partitions.
>>>
>>> So I combine them and get number of partitions lower than expected:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).getNumPartitions
>>> res14: Int = 126
>>>
>>> Is this expected behaviour? File contains 100000 lines, size of
>>> partitions before and after coalesce:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect()
>>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781)
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).mapPartitions{rows =>
>>> Iterator(rows.length)}.collect()
>>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782,
>>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782)
>>>
>>> So two partitions are double the size. Is this expected behaviour or is
>>> it some kind of bug?
>>>
>>> Thanks,
>>> Maciej Sokołowski
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro

Reply via email to