Re: data frame problem preserving sort order with repartition() and coalesce()

2016-03-30 Thread Takeshi Yamamuro
Hi,

"csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by
using RangePartitioner
(#partitions depends on "spark.sql.shuffle.partitions").
Seems, in your case, some empty partitions were removed, then you got 17
paritions.

// maropu

On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a requirement to write my results out into a series of CSV files.
> No file may have more than 100 rows of data. In the past my data was not
> sorted, and I was able to use reparation() or coalesce() to ensure the
> file length requirement.
>
> I realize that reparation() cause the data to be shuffled. It appears that
> changes the data ordering. So I sort the repartioned data again.
>
> What is really strange is I no longer get the number of output files I am
> expecting, and the number of lines constraint is not violated
>
> I am using spark-1.6.1
>
> Andy
>
> $ for i in topTags_CSV/*.csv; do wc -l $i; done
>
>   19 topTags_CSV/part-0.csv
>
>   19 topTags_CSV/part-1.csv
>
>   20 topTags_CSV/part-2.csv
>
>   19 topTags_CSV/part-3.csv
>
>   22 topTags_CSV/part-4.csv
>
>   19 topTags_CSV/part-5.csv
>
>   26 topTags_CSV/part-6.csv
>
>   18 topTags_CSV/part-7.csv
>
>   12 topTags_CSV/part-8.csv
>
>   25 topTags_CSV/part-9.csv
>
>   32 topTags_CSV/part-00010.csv
>
>   53 topTags_CSV/part-00011.csv
>
>   89 topTags_CSV/part-00012.csv
>
>  146 topTags_CSV/part-00013.csv
>
>  387 topTags_CSV/part-00014.csv
>
> 2708 topTags_CSV/part-00015.csv
>
>1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
> ​
>
> debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
>
> .format(numRows, quotient, remander, numPartitions))
>
> print(debugStr)
>
> ​
>
> csvDF = resultDF.coalesce(numPartitions)
>
> ​
>
> orderByColName = "count"
>
> csvDF = csvDF.sort(orderByColName, ascending=False)
>
> headerArg = 'true'# if headers else 'false'
>
> csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
>
> renamePartFiles(outputDir)
>
> numRows:3598 quotient:35 remander:98 repartition(36)
>
>
>
>
>


-- 
---
Takeshi Yamamuro


data frame problem preserving sort order with repartition() and coalesce()

2016-03-29 Thread Andy Davidson
I have a requirement to write my results out into a series of CSV files. No
file may have more than 100 rows of data. In the past my data was not
sorted, and I was able to use reparation() or coalesce() to ensure the file
length requirement.

I realize that reparation() cause the data to be shuffled. It appears that
changes the data ordering. So I sort the repartioned data again.

What is really strange is I no longer get the number of output files I am
expecting, and the number of lines constraint is not violated

I am using spark-1.6.1

Andy

$ for i in topTags_CSV/*.csv; do wc -l $i; done

  19 topTags_CSV/part-0.csv

  19 topTags_CSV/part-1.csv

  20 topTags_CSV/part-2.csv

  19 topTags_CSV/part-3.csv

  22 topTags_CSV/part-4.csv

  19 topTags_CSV/part-5.csv

  26 topTags_CSV/part-6.csv

  18 topTags_CSV/part-7.csv

  12 topTags_CSV/part-8.csv

  25 topTags_CSV/part-9.csv

  32 topTags_CSV/part-00010.csv

  53 topTags_CSV/part-00011.csv

  89 topTags_CSV/part-00012.csv

 146 topTags_CSV/part-00013.csv

 387 topTags_CSV/part-00014.csv

2708 topTags_CSV/part-00015.csv

   1 topTags_CSV/part-00016.csv

$ 


numRowsPerCSVFile = 100
numRows = resultDF.count()
quotient, remander = divmod(numRows, numRowsPerCSVFile)
numPartitions = (quotient + 1) if remander > 0 else quotient
​
debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
.format(numRows, quotient, remander, numPartitions))
print(debugStr)
​
csvDF = resultDF.coalesce(numPartitions)
​
orderByColName = "count"
csvDF = csvDF.sort(orderByColName, ascending=False)
headerArg = 'true'# if headers else 'false'
csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
renamePartFiles(outputDir)
numRows:3598 quotient:35 remander:98 repartition(36)