I have a requirement to write my results out into a series of CSV files. No file may have more than 100 rows of data. In the past my data was not sorted, and I was able to use reparation() or coalesce() to ensure the file length requirement.
I realize that reparation() cause the data to be shuffled. It appears that changes the data ordering. So I sort the repartioned data again. What is really strange is I no longer get the number of output files I am expecting, and the number of lines constraint is not violated I am using spark-1.6.1 Andy $ for i in topTags_CSV/*.csv; do wc -l $i; done 19 topTags_CSV/part-00000.csv 19 topTags_CSV/part-00001.csv 20 topTags_CSV/part-00002.csv 19 topTags_CSV/part-00003.csv 22 topTags_CSV/part-00004.csv 19 topTags_CSV/part-00005.csv 26 topTags_CSV/part-00006.csv 18 topTags_CSV/part-00007.csv 12 topTags_CSV/part-00008.csv 25 topTags_CSV/part-00009.csv 32 topTags_CSV/part-00010.csv 53 topTags_CSV/part-00011.csv 89 topTags_CSV/part-00012.csv 146 topTags_CSV/part-00013.csv 387 topTags_CSV/part-00014.csv 2708 topTags_CSV/part-00015.csv 1 topTags_CSV/part-00016.csv $ numRowsPerCSVFile = 100 numRows = resultDF.count() quotient, remander = divmod(numRows, numRowsPerCSVFile) numPartitions = (quotient + 1) if remander > 0 else quotient debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})" .format(numRows, quotient, remander, numPartitions)) print(debugStr) csvDF = resultDF.coalesce(numPartitions) orderByColName = "count" csvDF = csvDF.sort(orderByColName, ascending=False) headerArg = 'true'# if headers else 'false' csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg) renamePartFiles(outputDir) numRows:3598 quotient:35 remander:98 repartition(36)