Hi, "csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by using RangePartitioner (#partitions depends on "spark.sql.shuffle.partitions"). Seems, in your case, some empty partitions were removed, then you got 17 paritions.
// maropu On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson < a...@santacruzintegration.com> wrote: > I have a requirement to write my results out into a series of CSV files. > No file may have more than 100 rows of data. In the past my data was not > sorted, and I was able to use reparation() or coalesce() to ensure the > file length requirement. > > I realize that reparation() cause the data to be shuffled. It appears that > changes the data ordering. So I sort the repartioned data again. > > What is really strange is I no longer get the number of output files I am > expecting, and the number of lines constraint is not violated > > I am using spark-1.6.1 > > Andy > > $ for i in topTags_CSV/*.csv; do wc -l $i; done > > 19 topTags_CSV/part-00000.csv > > 19 topTags_CSV/part-00001.csv > > 20 topTags_CSV/part-00002.csv > > 19 topTags_CSV/part-00003.csv > > 22 topTags_CSV/part-00004.csv > > 19 topTags_CSV/part-00005.csv > > 26 topTags_CSV/part-00006.csv > > 18 topTags_CSV/part-00007.csv > > 12 topTags_CSV/part-00008.csv > > 25 topTags_CSV/part-00009.csv > > 32 topTags_CSV/part-00010.csv > > 53 topTags_CSV/part-00011.csv > > 89 topTags_CSV/part-00012.csv > > 146 topTags_CSV/part-00013.csv > > 387 topTags_CSV/part-00014.csv > > 2708 topTags_CSV/part-00015.csv > > 1 topTags_CSV/part-00016.csv > > $ > > numRowsPerCSVFile = 100 > > numRows = resultDF.count() > > quotient, remander = divmod(numRows, numRowsPerCSVFile) > > numPartitions = (quotient + 1) if remander > 0 else quotient > > > > debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})" > > .format(numRows, quotient, remander, numPartitions)) > > print(debugStr) > > > > csvDF = resultDF.coalesce(numPartitions) > > > > orderByColName = "count" > > csvDF = csvDF.sort(orderByColName, ascending=False) > > headerArg = 'true'# if headers else 'false' > > csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg) > > renamePartFiles(outputDir) > > numRows:3598 quotient:35 remander:98 repartition(36) > > > > > -- --- Takeshi Yamamuro