I have a requirement to write my results out into a series of CSV files. No
file may have more than 100 rows of data. In the past my data was not
sorted, and I was able to use reparation() or coalesce() to ensure the file
length requirement.

I realize that reparation() cause the data to be shuffled. It appears that
changes the data ordering. So I sort the repartioned data again.

What is really strange is I no longer get the number of output files I am
expecting, and the number of lines constraint is not violated

I am using spark-1.6.1

Andy

$ for i in topTags_CSV/*.csv; do wc -l $i; done

      19 topTags_CSV/part-00000.csv

      19 topTags_CSV/part-00001.csv

      20 topTags_CSV/part-00002.csv

      19 topTags_CSV/part-00003.csv

      22 topTags_CSV/part-00004.csv

      19 topTags_CSV/part-00005.csv

      26 topTags_CSV/part-00006.csv

      18 topTags_CSV/part-00007.csv

      12 topTags_CSV/part-00008.csv

      25 topTags_CSV/part-00009.csv

      32 topTags_CSV/part-00010.csv

      53 topTags_CSV/part-00011.csv

      89 topTags_CSV/part-00012.csv

     146 topTags_CSV/part-00013.csv

     387 topTags_CSV/part-00014.csv

    2708 topTags_CSV/part-00015.csv

       1 topTags_CSV/part-00016.csv

$ 


numRowsPerCSVFile = 100
numRows = resultDF.count()
quotient, remander = divmod(numRows, numRowsPerCSVFile)
numPartitions = (quotient + 1) if remander > 0 else quotient
​
debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
                .format(numRows, quotient, remander, numPartitions))
print(debugStr)
​
csvDF = resultDF.coalesce(numPartitions)
​
orderByColName = "count"
csvDF = csvDF.sort(orderByColName, ascending=False)
headerArg = 'true'# if headers else 'false'
csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
renamePartFiles(outputDir)
numRows:3598 quotient:35 remander:98 repartition(36)





Reply via email to