Re: Spark 2.4 partitions and tasks

2019-02-25 Thread Pedro Tuero
Good question.  What I have read about is that Spark is not a magician and
can't know how many tasks will be better for your input, so it can fail.
Spark set the default parallelism as twice the number of cores on the
cluster.
In my jobs, it seemed that using the parallelism inherited from input parts
worked well sometimes, and it was 100x the default parallelism.
When every job started to use default parallelism (apparently when
switching from emr 5.16 to 5.20), I first tried to do some repartitions but
in some cases, it was the same: The repartition job took as long as the job
I wanted to affect (or failed directly).
Doing the repartition inside some operation on Rdd pairs worked really
better (https: //
stackoverflow.com/questions/43027306/is-there-an-effective-partitioning-method-when-using-reducebykey-in-spark
).

It will be nice to have a more comprehensive look at which Rdds should need
more or less parallelism.

Regards,
Pedro.

El sáb., 23 de feb. de 2019 a la(s) 21:27, Yeikel (em...@yeikel.com)
escribió:

> I am following up on this question because I have a similar issue.
>
> When is that we need to control the parallelism manually? Skewed
> partitions?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.4 partitions and tasks

2019-02-23 Thread Yeikel
I am following up on this question because I have a similar issue. 

When is that we need to control the parallelism manually? Skewed partitions?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
* It is not getPartitions() but getNumPartitions().

El mar., 12 de feb. de 2019 a la(s) 13:08, Pedro Tuero (tuerope...@gmail.com)
escribió:

> And this is happening in every job I run. It is not just one case. If I
> add a forced repartitions it works fine, even better than before. But I run
> the same code for different inputs so the number to make repartitions must
> be related to the input.
>
>
> El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi Jacek.
>> I 'm not using SparkSql, I'm using RDD API directly.
>> I can confirm that the jobs and stages are the same on both executions.
>> In the environment tab of the web UI, when using spark 2.4
>> spark.default.parallelism=128 is shown while in 2.3.1 is not.
>> But in 2.3.1 should be the same, because 128 is the number of cores of
>> cluster * 2 and it didn't change in the latest version.
>>
>> In the example I gave, 5580 is the number of parts left by a previous job
>> in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
>> While in 2.3.1, RDDs that are created with transformations from the
>> initial RDD conserve the same number of partitions, in 2.4 the number of
>> partitions reset to default.
>> So RDD1, the product of the first mapToPair, prints 5580 when
>> getPartitions() is called in 2.3.1, while prints 128 in 2.4.
>>
>> Regards,
>> Pedro
>>
>>
>> El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (
>> ja...@japila.pl) escribió:
>>
>>> Hi,
>>>
>>> Can you show the plans with explain(extended=true) for both versions?
>>> That's where I'd start to pinpoint the issue. Perhaps the underlying
>>> execution engine change to affect keyBy? Dunno and guessing...
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>>>
 I did a repartition to 1 (hardcoded) before the keyBy and it ends
 in 1.2 minutes.
 The questions remain open, because I don't want to harcode paralellism.

 El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
 tuerope...@gmail.com) escribió:

> 128 is the default parallelism defined for the cluster.
> The question now is why keyBy operation is using default parallelism
> instead of the number of partition of the RDD created by the previous step
> (5580).
> Any clues?
>
> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi,
>> I am running a job in spark (using aws emr) and some stages are
>> taking a lot more using spark  2.4 instead of Spark 2.3.1:
>>
>> Spark 2.4:
>> [image: image.png]
>>
>> Spark 2.3.1:
>> [image: image.png]
>>
>> With Spark 2.4, the keyBy operation take more than 10X what it took
>> with Spark 2.3.1
>> It seems to be related to the number of tasks / partitions.
>>
>> Questions:
>> - Is it not supposed that the number of task of a job is related to
>> number of parts of the RDD left by the previous job? Did that change in
>> version 2.4??
>> - Which tools/ configuration may I try, to reduce this aberrant
>> downgrade of performance??
>>
>> Thanks.
>> Pedro.
>>
>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
And this is happening in every job I run. It is not just one case. If I add
a forced repartitions it works fine, even better than before. But I run the
same code for different inputs so the number to make repartitions must be
related to the input.


El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero (tuerope...@gmail.com)
escribió:

> Hi Jacek.
> I 'm not using SparkSql, I'm using RDD API directly.
> I can confirm that the jobs and stages are the same on both executions.
> In the environment tab of the web UI, when using spark 2.4
> spark.default.parallelism=128 is shown while in 2.3.1 is not.
> But in 2.3.1 should be the same, because 128 is the number of cores of
> cluster * 2 and it didn't change in the latest version.
>
> In the example I gave, 5580 is the number of parts left by a previous job
> in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
> While in 2.3.1, RDDs that are created with transformations from the
> initial RDD conserve the same number of partitions, in 2.4 the number of
> partitions reset to default.
> So RDD1, the product of the first mapToPair, prints 5580 when
> getPartitions() is called in 2.3.1, while prints 128 in 2.4.
>
> Regards,
> Pedro
>
>
> El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (
> ja...@japila.pl) escribió:
>
>> Hi,
>>
>> Can you show the plans with explain(extended=true) for both versions?
>> That's where I'd start to pinpoint the issue. Perhaps the underlying
>> execution engine change to affect keyBy? Dunno and guessing...
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>>
>>> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
>>> 1.2 minutes.
>>> The questions remain open, because I don't want to harcode paralellism.
>>>
>>> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
>>> tuerope...@gmail.com) escribió:
>>>
 128 is the default parallelism defined for the cluster.
 The question now is why keyBy operation is using default parallelism
 instead of the number of partition of the RDD created by the previous step
 (5580).
 Any clues?

 El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
 tuerope...@gmail.com) escribió:

> Hi,
> I am running a job in spark (using aws emr) and some stages are taking
> a lot more using spark  2.4 instead of Spark 2.3.1:
>
> Spark 2.4:
> [image: image.png]
>
> Spark 2.3.1:
> [image: image.png]
>
> With Spark 2.4, the keyBy operation take more than 10X what it took
> with Spark 2.3.1
> It seems to be related to the number of tasks / partitions.
>
> Questions:
> - Is it not supposed that the number of task of a job is related to
> number of parts of the RDD left by the previous job? Did that change in
> version 2.4??
> - Which tools/ configuration may I try, to reduce this aberrant
> downgrade of performance??
>
> Thanks.
> Pedro.
>



Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Pedro Tuero
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4
spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of
cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job
in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial
RDD conserve the same number of partitions, in 2.4 the number of partitions
reset to default.
So RDD1, the product of the first mapToPair, prints 5580 when
getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski (ja...@japila.pl)
escribió:

> Hi,
>
> Can you show the plans with explain(extended=true) for both versions?
> That's where I'd start to pinpoint the issue. Perhaps the underlying
> execution engine change to affect keyBy? Dunno and guessing...
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:
>
>> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
>> 1.2 minutes.
>> The questions remain open, because I don't want to harcode paralellism.
>>
>> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
>> tuerope...@gmail.com) escribió:
>>
>>> 128 is the default parallelism defined for the cluster.
>>> The question now is why keyBy operation is using default parallelism
>>> instead of the number of partition of the RDD created by the previous step
>>> (5580).
>>> Any clues?
>>>
>>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>>> tuerope...@gmail.com) escribió:
>>>
 Hi,
 I am running a job in spark (using aws emr) and some stages are taking
 a lot more using spark  2.4 instead of Spark 2.3.1:

 Spark 2.4:
 [image: image.png]

 Spark 2.3.1:
 [image: image.png]

 With Spark 2.4, the keyBy operation take more than 10X what it took
 with Spark 2.3.1
 It seems to be related to the number of tasks / partitions.

 Questions:
 - Is it not supposed that the number of task of a job is related to
 number of parts of the RDD left by the previous job? Did that change in
 version 2.4??
 - Which tools/ configuration may I try, to reduce this aberrant
 downgrade of performance??

 Thanks.
 Pedro.

>>>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Jacek Laskowski
Hi,

Can you show the plans with explain(extended=true) for both versions?
That's where I'd start to pinpoint the issue. Perhaps the underlying
execution engine change to affect keyBy? Dunno and guessing...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:

> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
> 1.2 minutes.
> The questions remain open, because I don't want to harcode paralellism.
>
> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> 128 is the default parallelism defined for the cluster.
>> The question now is why keyBy operation is using default parallelism
>> instead of the number of partition of the RDD created by the previous step
>> (5580).
>> Any clues?
>>
>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>> tuerope...@gmail.com) escribió:
>>
>>> Hi,
>>> I am running a job in spark (using aws emr) and some stages are taking a
>>> lot more using spark  2.4 instead of Spark 2.3.1:
>>>
>>> Spark 2.4:
>>> [image: image.png]
>>>
>>> Spark 2.3.1:
>>> [image: image.png]
>>>
>>> With Spark 2.4, the keyBy operation take more than 10X what it took with
>>> Spark 2.3.1
>>> It seems to be related to the number of tasks / partitions.
>>>
>>> Questions:
>>> - Is it not supposed that the number of task of a job is related to
>>> number of parts of the RDD left by the previous job? Did that change in
>>> version 2.4??
>>> - Which tools/ configuration may I try, to reduce this aberrant
>>> downgrade of performance??
>>>
>>> Thanks.
>>> Pedro.
>>>
>>


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
I did a repartition to 1 (hardcoded) before the keyBy and it ends in
1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (tuerope...@gmail.com)
escribió:

> 128 is the default parallelism defined for the cluster.
> The question now is why keyBy operation is using default parallelism
> instead of the number of partition of the RDD created by the previous step
> (5580).
> Any clues?
>
> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi,
>> I am running a job in spark (using aws emr) and some stages are taking a
>> lot more using spark  2.4 instead of Spark 2.3.1:
>>
>> Spark 2.4:
>> [image: image.png]
>>
>> Spark 2.3.1:
>> [image: image.png]
>>
>> With Spark 2.4, the keyBy operation take more than 10X what it took with
>> Spark 2.3.1
>> It seems to be related to the number of tasks / partitions.
>>
>> Questions:
>> - Is it not supposed that the number of task of a job is related to
>> number of parts of the RDD left by the previous job? Did that change in
>> version 2.4??
>> - Which tools/ configuration may I try, to reduce this aberrant downgrade
>> of performance??
>>
>> Thanks.
>> Pedro.
>>
>


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism
instead of the number of partition of the RDD created by the previous step
(5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (tuerope...@gmail.com)
escribió:

> Hi,
> I am running a job in spark (using aws emr) and some stages are taking a
> lot more using spark  2.4 instead of Spark 2.3.1:
>
> Spark 2.4:
> [image: image.png]
>
> Spark 2.3.1:
> [image: image.png]
>
> With Spark 2.4, the keyBy operation take more than 10X what it took with
> Spark 2.3.1
> It seems to be related to the number of tasks / partitions.
>
> Questions:
> - Is it not supposed that the number of task of a job is related to number
> of parts of the RDD left by the previous job? Did that change in version
> 2.4??
> - Which tools/ configuration may I try, to reduce this aberrant downgrade
> of performance??
>
> Thanks.
> Pedro.
>


Spark 2.4 partitions and tasks

2019-02-07 Thread Pedro Tuero
Hi,
I am running a job in spark (using aws emr) and some stages are taking a
lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
[image: image.png]

Spark 2.3.1:
[image: image.png]

With Spark 2.4, the keyBy operation take more than 10X what it took with
Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number
of parts of the RDD left by the previous job? Did that change in version
2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade
of performance??

Thanks.
Pedro.