Re: How to control count / size of output files for
hi Thank you. The suggestion is very good. There is no need to use "repartitionByRange", However, there is a little doubt that if the output file is required to be globally ordered, "repartition" will disrupt the order of the data, and the result of using "coalesce" is correct Best Regards, m li Gourav Sengupta wrote > Hi, > > firstly there is no need to use repartition by range. The repartition, or > coalesce clause can come after the sort and everything will be fine. > > > Secondly to reduce the number of records per file there is no need to use > repartition, just try to sort and then write out the files using the > property: spark.sql.files.maxRecordsPerFile unless there is skew in the > data things will work out fine. > > > Regards, > Gourav Sengupta > > On Mon, Mar 8, 2021 at 4:01 PM m li < > xiyunanmenwai@ > > wrote: > >> Hi Ivan, >> >> >> >> If the error you are referring to is that the data is out of order, it >> may >> be that the data is out of order due to the “repartition”. You can try to >> use the “repartitionByRange” >> >> scala> val df = sc.parallelize (1 to 1000, 10).toDF("v") >> >> scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v"). >> write.parquet(outputPath) >> >> >> >> Best Regards, >> >> m li >> Ivan Petrov wrote >> > Ah... makes sense, thank you. i tried sortWithinPartition before and >> > replaced with sort. It was a mistake. >> > >> > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile < >> >> > pietro.gentile89.developer@ >> >> >>: >> > >> >> Hi, >> >> >> >> It is because of *repartition* before the *sort* method invocation. If >> >> you reverse them you'll see 5 output files. >> >> >> >> Regards, >> >> Pietro >> >> >> >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov < >> >> > capacytron@ >> >> > > >> >> ha scritto: >> >> >> >>> Hi, I'm trying to control the size and/or count of spark output. >> >>> >> >>> Here is my code. I expect to get 5 files but I get dozens of small >> >>> files. >> >>> Why? >> >>> >> >>> dataset >> >>> .repartition(5) >> >>> .sort("long_repeated_string_in_this_column") // should be better >> >>> compressed with snappy >> >>> .write >> >>> .parquet(outputPath) >> >>> >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: > user-unsubscribe@.apache >> >> -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to control count / size of output files for
Hi, firstly there is no need to use repartition by range. The repartition, or coalesce clause can come after the sort and everything will be fine. Secondly to reduce the number of records per file there is no need to use repartition, just try to sort and then write out the files using the property: spark.sql.files.maxRecordsPerFile unless there is skew in the data things will work out fine. Regards, Gourav Sengupta On Mon, Mar 8, 2021 at 4:01 PM m li wrote: > Hi Ivan, > > > > If the error you are referring to is that the data is out of order, it may > be that the data is out of order due to the “repartition”. You can try to > use the “repartitionByRange” > > scala> val df = sc.parallelize (1 to 1000, 10).toDF("v") > > scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v"). > write.parquet(outputPath) > > > > Best Regards, > > m li > Ivan Petrov wrote > > Ah... makes sense, thank you. i tried sortWithinPartition before and > > replaced with sort. It was a mistake. > > > > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile < > > > pietro.gentile89.developer@ > > >>: > > > >> Hi, > >> > >> It is because of *repartition* before the *sort* method invocation. If > >> you reverse them you'll see 5 output files. > >> > >> Regards, > >> Pietro > >> > >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov < > > > capacytron@ > > > > > >> ha scritto: > >> > >>> Hi, I'm trying to control the size and/or count of spark output. > >>> > >>> Here is my code. I expect to get 5 files but I get dozens of small > >>> files. > >>> Why? > >>> > >>> dataset > >>> .repartition(5) > >>> .sort("long_repeated_string_in_this_column") // should be better > >>> compressed with snappy > >>> .write > >>> .parquet(outputPath) > >>> > >> > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: How to control count / size of output files for
Hi Ivan, If the error you are referring to is that the data is out of order, it may be that the data is out of order due to the “repartition”. You can try to use the “repartitionByRange” scala> val df = sc.parallelize (1 to 1000, 10).toDF("v") scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v"). write.parquet(outputPath) Best Regards, m li Ivan Petrov wrote > Ah... makes sense, thank you. i tried sortWithinPartition before and > replaced with sort. It was a mistake. > > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile < > pietro.gentile89.developer@ >>: > >> Hi, >> >> It is because of *repartition* before the *sort* method invocation. If >> you reverse them you'll see 5 output files. >> >> Regards, >> Pietro >> >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov < > capacytron@ > > >> ha scritto: >> >>> Hi, I'm trying to control the size and/or count of spark output. >>> >>> Here is my code. I expect to get 5 files but I get dozens of small >>> files. >>> Why? >>> >>> dataset >>> .repartition(5) >>> .sort("long_repeated_string_in_this_column") // should be better >>> compressed with snappy >>> .write >>> .parquet(outputPath) >>> >> -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to control count / size of output files for
Hi Ivan, sorry but it always helps to know the version of SPARK you are using, its environment, and the format that you are writing out your files to, and any other details if possible. Regards, Gourav Sengupta On Wed, Feb 24, 2021 at 3:43 PM Ivan Petrov wrote: > Hi, I'm trying to control the size and/or count of spark output. > > Here is my code. I expect to get 5 files but I get dozens of small files. > Why? > > dataset > .repartition(5) > .sort("long_repeated_string_in_this_column") // should be better > compressed with snappy > .write > .parquet(outputPath) >
Re: How to control count / size of output files for
Ah... makes sense, thank you. i tried sortWithinPartition before and replaced with sort. It was a mistake. чт, 25 февр. 2021 г. в 15:25, Pietro Gentile < pietro.gentile89.develo...@gmail.com>: > Hi, > > It is because of *repartition* before the *sort* method invocation. If > you reverse them you'll see 5 output files. > > Regards, > Pietro > > Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov > ha scritto: > >> Hi, I'm trying to control the size and/or count of spark output. >> >> Here is my code. I expect to get 5 files but I get dozens of small files. >> Why? >> >> dataset >> .repartition(5) >> .sort("long_repeated_string_in_this_column") // should be better >> compressed with snappy >> .write >> .parquet(outputPath) >> >
Re: How to control count / size of output files for
hi! It is because of "spark.sql.shuffle.partitions". See the value 200 in the physical plan at the rangepartitioning: scala> val df = sc.parallelize(1 to 1000, 10).toDF("v").sort("v") df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [v: int] scala> df.explain() == Physical Plan == *(2) Sort [v#300 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(v#300 ASC NULLS FIRST, 200), true, [id=#334] +- *(1) Project [value#297 AS v#300] +- *(1) SerializeFromObject [input[0, int, false] AS value#297] +- Scan[obj#296] scala> df.rdd.getNumPartitions res13: Int = 200 Best Regards, Attila -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org