Re: How to control count / size of output files for

2021-03-10 Thread m li
hi

Thank you. The suggestion is very good. There is no need to use
"repartitionByRange",

However, there is a little doubt that if the output file is required to be
globally ordered, "repartition" will disrupt the order of the data, and the
result of using "coalesce" is correct

Best Regards,
m li


Gourav Sengupta wrote
> Hi,
> 
> firstly there is no need to use repartition by range. The repartition, or
> coalesce clause can come after the sort and everything will be fine.
> 
> 
> Secondly to reduce the number of records per file there is no need to use
> repartition, just try to sort  and then write out the files using the
> property: spark.sql.files.maxRecordsPerFile unless there is skew in the
> data things will work out fine.
> 
> 
> Regards,
> Gourav Sengupta
> 
> On Mon, Mar 8, 2021 at 4:01 PM m li 

> xiyunanmenwai@

>  wrote:
> 
>> Hi Ivan,
>>
>>
>>
>> If the error you are referring to is that the data is out of order, it
>> may
>> be that the data is out of order due to the “repartition”. You can try to
>> use the “repartitionByRange”
>>
>> scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")
>>
>> scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
>> write.parquet(outputPath)
>>
>>
>>
>> Best Regards,
>>
>> m li
>> Ivan Petrov wrote
>> > Ah... makes sense, thank you. i tried sortWithinPartition before and
>> > replaced with sort. It was a mistake.
>> >
>> > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <
>>
>> > pietro.gentile89.developer@
>>
>> >>:
>> >
>> >> Hi,
>> >>
>> >> It is because of *repartition* before the *sort* method invocation. If
>> >> you reverse them you'll see 5 output files.
>> >>
>> >> Regards,
>> >> Pietro
>> >>
>> >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov 
>>
>> > capacytron@
>>
>> > 
>> >> ha scritto:
>> >>
>> >>> Hi, I'm trying to control the size and/or count of spark output.
>> >>>
>> >>> Here is my code. I expect to get 5 files  but I get dozens of small
>> >>> files.
>> >>> Why?
>> >>>
>> >>> dataset
>> >>> .repartition(5)
>> >>> .sort("long_repeated_string_in_this_column") // should be better
>> >>> compressed with snappy
>> >>> .write
>> >>> .parquet(outputPath)
>> >>>
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: 

> user-unsubscribe@.apache

>>
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to control count / size of output files for

2021-03-08 Thread Gourav Sengupta
Hi,

firstly there is no need to use repartition by range. The repartition, or
coalesce clause can come after the sort and everything will be fine.


Secondly to reduce the number of records per file there is no need to use
repartition, just try to sort  and then write out the files using the
property: spark.sql.files.maxRecordsPerFile unless there is skew in the
data things will work out fine.


Regards,
Gourav Sengupta

On Mon, Mar 8, 2021 at 4:01 PM m li  wrote:

> Hi Ivan,
>
>
>
> If the error you are referring to is that the data is out of order, it may
> be that the data is out of order due to the “repartition”. You can try to
> use the “repartitionByRange”
>
> scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")
>
> scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
> write.parquet(outputPath)
>
>
>
> Best Regards,
>
> m li
> Ivan Petrov wrote
> > Ah... makes sense, thank you. i tried sortWithinPartition before and
> > replaced with sort. It was a mistake.
> >
> > чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <
>
> > pietro.gentile89.developer@
>
> >>:
> >
> >> Hi,
> >>
> >> It is because of *repartition* before the *sort* method invocation. If
> >> you reverse them you'll see 5 output files.
> >>
> >> Regards,
> >> Pietro
> >>
> >> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov 
>
> > capacytron@
>
> > 
> >> ha scritto:
> >>
> >>> Hi, I'm trying to control the size and/or count of spark output.
> >>>
> >>> Here is my code. I expect to get 5 files  but I get dozens of small
> >>> files.
> >>> Why?
> >>>
> >>> dataset
> >>> .repartition(5)
> >>> .sort("long_repeated_string_in_this_column") // should be better
> >>> compressed with snappy
> >>> .write
> >>> .parquet(outputPath)
> >>>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to control count / size of output files for

2021-03-08 Thread m li
Hi Ivan,



If the error you are referring to is that the data is out of order, it may
be that the data is out of order due to the “repartition”. You can try to
use the “repartitionByRange”

scala> val df = sc.parallelize (1 to 1000, 10).toDF("v")

scala> df.repartitionByRange(5,column("v")).sortWithinPartitions("v").
write.parquet(outputPath)



Best Regards,

m li
Ivan Petrov wrote
> Ah... makes sense, thank you. i tried sortWithinPartition before and
> replaced with sort. It was a mistake.
> 
> чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <

> pietro.gentile89.developer@

>>:
> 
>> Hi,
>>
>> It is because of *repartition* before the *sort* method invocation. If
>> you reverse them you'll see 5 output files.
>>
>> Regards,
>> Pietro
>>
>> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov 

> capacytron@

> 
>> ha scritto:
>>
>>> Hi, I'm trying to control the size and/or count of spark output.
>>>
>>> Here is my code. I expect to get 5 files  but I get dozens of small
>>> files.
>>> Why?
>>>
>>> dataset
>>> .repartition(5)
>>> .sort("long_repeated_string_in_this_column") // should be better
>>> compressed with snappy
>>> .write
>>> .parquet(outputPath)
>>>
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to control count / size of output files for

2021-02-25 Thread Gourav Sengupta
Hi Ivan,

sorry but it always helps to know the version of SPARK you are using, its
environment, and the format that you are writing out your files to, and any
other details if possible.


Regards,
Gourav Sengupta

On Wed, Feb 24, 2021 at 3:43 PM Ivan Petrov  wrote:

> Hi, I'm trying to control the size and/or count of spark output.
>
> Here is my code. I expect to get 5 files  but I get dozens of small files.
> Why?
>
> dataset
> .repartition(5)
> .sort("long_repeated_string_in_this_column") // should be better
> compressed with snappy
> .write
> .parquet(outputPath)
>


Re: How to control count / size of output files for

2021-02-25 Thread Ivan Petrov
Ah... makes sense, thank you. i tried sortWithinPartition before and
replaced with sort. It was a mistake.

чт, 25 февр. 2021 г. в 15:25, Pietro Gentile <
pietro.gentile89.develo...@gmail.com>:

> Hi,
>
> It is because of *repartition* before the *sort* method invocation. If
> you reverse them you'll see 5 output files.
>
> Regards,
> Pietro
>
> Il giorno mer 24 feb 2021 alle ore 16:43 Ivan Petrov 
> ha scritto:
>
>> Hi, I'm trying to control the size and/or count of spark output.
>>
>> Here is my code. I expect to get 5 files  but I get dozens of small files.
>> Why?
>>
>> dataset
>> .repartition(5)
>> .sort("long_repeated_string_in_this_column") // should be better
>> compressed with snappy
>> .write
>> .parquet(outputPath)
>>
>


Re: How to control count / size of output files for

2021-02-24 Thread Attila Zsolt Piros
hi!

It is because of "spark.sql.shuffle.partitions". See the value 200 in the
physical plan at the rangepartitioning:


scala> val df = sc.parallelize(1 to 1000, 10).toDF("v").sort("v")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [v: int]

scala> df.explain()
== Physical Plan ==
*(2) Sort [v#300 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(v#300 ASC NULLS FIRST, 200), true, [id=#334]
   +- *(1) Project [value#297 AS v#300]
  +- *(1) SerializeFromObject [input[0, int, false] AS value#297]
 +- Scan[obj#296]

scala> df.rdd.getNumPartitions
res13: Int = 200

Best Regards,
Attila







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to control count / size of output files for

2021-02-24 Thread Ivan Petrov
Hi, I'm trying to control the size and/or count of spark output.

Here is my code. I expect to get 5 files  but I get dozens of small files.
Why?

dataset
.repartition(5)
.sort("long_repeated_string_in_this_column") // should be better compressed
with snappy
.write
.parquet(outputPath)