Hi,

"csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by
using RangePartitioner
(#partitions depends on "spark.sql.shuffle.partitions").
Seems, in your case, some empty partitions were removed, then you got 17
paritions.

// maropu

On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a requirement to write my results out into a series of CSV files.
> No file may have more than 100 rows of data. In the past my data was not
> sorted, and I was able to use reparation() or coalesce() to ensure the
> file length requirement.
>
> I realize that reparation() cause the data to be shuffled. It appears that
> changes the data ordering. So I sort the repartioned data again.
>
> What is really strange is I no longer get the number of output files I am
> expecting, and the number of lines constraint is not violated
>
> I am using spark-1.6.1
>
> Andy
>
> $ for i in topTags_CSV/*.csv; do wc -l $i; done
>
>       19 topTags_CSV/part-00000.csv
>
>       19 topTags_CSV/part-00001.csv
>
>       20 topTags_CSV/part-00002.csv
>
>       19 topTags_CSV/part-00003.csv
>
>       22 topTags_CSV/part-00004.csv
>
>       19 topTags_CSV/part-00005.csv
>
>       26 topTags_CSV/part-00006.csv
>
>       18 topTags_CSV/part-00007.csv
>
>       12 topTags_CSV/part-00008.csv
>
>       25 topTags_CSV/part-00009.csv
>
>       32 topTags_CSV/part-00010.csv
>
>       53 topTags_CSV/part-00011.csv
>
>       89 topTags_CSV/part-00012.csv
>
>      146 topTags_CSV/part-00013.csv
>
>      387 topTags_CSV/part-00014.csv
>
>     2708 topTags_CSV/part-00015.csv
>
>        1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
> ​
>
> debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
>
>                 .format(numRows, quotient, remander, numPartitions))
>
> print(debugStr)
>
> ​
>
> csvDF = resultDF.coalesce(numPartitions)
>
> ​
>
> orderByColName = "count"
>
> csvDF = csvDF.sort(orderByColName, ascending=False)
>
> headerArg = 'true'# if headers else 'false'
>
> csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
>
> renamePartFiles(outputDir)
>
> numRows:3598 quotient:35 remander:98 repartition(36)
>
>
>
>
>


-- 
---
Takeshi Yamamuro

Reply via email to