Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-23 Thread vijay.bvp
thanks for adding RDD lineage graph.
I could see 18 parallel tasks for HDFS Read was it changed. 


what is the spark job configuration, how many executors and cores per
exeuctor

i would say keep the partitioning multiple of  (no of executors * cores) for
all the RDD's

if you have 3 executors with 3 cores assigned for the job, 9 parallel tasks
are posible
set repartitioning on rdd;s to multiple of 9 

spark.read.parquet().repartition(27)
kafka.createDStream().repartition(27)

coalesce with shuff=false will actually causes problem with upstream
parallelism. 

please test the above scenario and share the findings.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-20 Thread LongVehicle
Hi Vijay,

Thanks for the follow-up.

The reason why we have 90 HDFS files (causing the parallelism of 90 for HDFS
read stage) is because we load the same HDFS data in different jobs, and
these jobs have parallelisms (executors X cores) of 9, 18, 30. The uneven
assignment problem that we had before could not be explained by modulo
operation/remainder, because we sometimes had only 2 executors active out of
9 (while the remaining 7 would stay completely idle).

We tried to repartition the Kafka stream to 90 partitions, but it led to
even worse disbalance in the load. Seems that keeping the number of
partitions equal to executors X cores reduces the chance of uneven
assignment.

We also tried to repartition the HDFS data to 9 partitions, but it did not
help, because repartition takes into account the initial locality of data,
so 9 partitions may end up on 9 different cores. We also tried to set
spark.shuffle.reduceLocality.enabled=false, but it did not help. Last but
not least, we want to avoid coleasce, because then partitions would depend
on the HDFS block distribution, so they would not be hash partitioned (which
we need for the join).

Please find below the relevant UI snapshots:


 
 

The snapshots refers to the batch when RDD is reloaded (WholeStageCodegen
1147 is gray except in the batch at reload time, which happens every 30
minutes).

Thanks a lot!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread vijay.bvp
apologies for the long answer. 

understanding partitioning at each stage of the the RDD graph/lineage is
important for efficient parallelism and having load balanced. This applies
to working with any sources streaming or static. 
you have tricky situation here of one source kafka with 9 partitions and
static data set 90 partitions. 

before joining both these try to have number of partitions equal for both
RDD's
you can either repartition kafka source to 90 partitions or coalesce flat
file RDD to 9 partitions
or midway between 9 and 90. 

in general no of tasks that can run in parallel equal to total no of cores
spark job has (no of executors * no of cores per executor).

As an example
if the flat file has 90 partitions and if you set 4 executors each with 5
cores for a total of 20 cores if you have 20+20+20+20+10 tasks gets
scheduled. as you can see at the last you will have only 10 tasks though you
have 20 cores. 

compare this with 6 executors each with 5 cores for a total of 30 cores,
then it would be
30+30+30. 

ideally no of partitions for each RDD (in the graph lineage) should be a
multiple of total no of available cores for the spark job.

in terms of data locality prefer process-local over node-local over rack
local
as an example 
5 executors with 4 cores and 4 executors with 5 cores each of this option
will have 20 cores in total.
But with 4 executors its less shuffling more process-local/node-local

need to look at RDD graph for this
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd


on your final question, you should be able to tune the static RDD without
external store by carefully looking at each batch RDD lineage for that 30
mins before the RDD gets refreshed again. 

if you would like to use external system Apache Ignite is something that you
can use as cache.

thanks
Vijay








--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread Aleksandar Vitorovic
 Hi Vijay,

Thank you very much for your reply. Setting the number of partitions
explicitly in the join, and memory pressure influence on partitioning were
definitely very good insights.

At the end, we avoid the issue of uneven load balancing completely by doing
the following two:
a) Reducing the number of executors, and increasing the number of cores and
executor memory
b) Increasing the batch interval size from 15s to 30s.

Here is a nice blog post that explains how to improve performance for Spark
jobs in general:
https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/
.

@Vijay: And here are the responses to your questions:
1) Correct.

2) This is exactly what confuses us: There is nothing between the following
lines:
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd

We saw that a separate query plan is executed on converting DataFrame to
RDD (.rdd method). Is it equivalent to repartition, coalesce or something
else?

3) Exactly.

4) We are caching the static rdd for 30 minutes. That is, we have a trait
with readLast method that returns the last read RDD, and once the RDD is
more than 30 minutes old, we reload its content from disk using  df =
sqlContext.read.parquet(...).

---

My final question is the following: What would be the most efficient way
(including possibly an external key-value store) for efficient store,
update and retrieval of final_rdd? The state may grow beyond 3GB, and we
want to maintain our scalability and latency. In fact, we have many Spark
jobs that join the same RDD with different Kafka streams.

Thank you very much!

On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvp  wrote:

> Summarizing
>
> 1) Static data set read from Parquet files as DataFrame in HDFS has initial
> parallelism of 90 (based on no input files)
>
> 2) static data set DataFrame is converted as rdd, and rdd has parallelism
> of
> 18 this was not expected
> dataframe.rdd is lazy evaluation there must be some operation you were
> doing
> that would have triggered
> conversion from 90 to 18, this would be some operation that breaks
> stage/requires shuffling such as groupby, reduceby, repartition,coalesce
> if you are using coalesce, the second parameter shuff is by default false
> which means upstream parallelism is not preserved.
>
> 3) you have DStream of Kafka source with 9 partitions this is joined with
> above static data set? when joining have you tried setting up numPartitions
> an optional parameter to provide no of partitions required.
>
> 4) your batch interval is 15 seconds but you are caching the static data
> set
> for 30 minutes, what exactly you mean caching for 30 minutes?
>
> Note when you cache data based on the memory pressure there is chance that
> partitioning is not preserved.
>
> it would be useful to provide spark UI screen shots for one complete batch,
> the DAG and other details
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-01-31 Thread vijay.bvp
Summarizing

1) Static data set read from Parquet files as DataFrame in HDFS has initial
parallelism of 90 (based on no input files)

2) static data set DataFrame is converted as rdd, and rdd has parallelism of
18 this was not expected
dataframe.rdd is lazy evaluation there must be some operation you were doing
that would have triggered
conversion from 90 to 18, this would be some operation that breaks
stage/requires shuffling such as groupby, reduceby, repartition,coalesce
if you are using coalesce, the second parameter shuff is by default false
which means upstream parallelism is not preserved.

3) you have DStream of Kafka source with 9 partitions this is joined with
above static data set? when joining have you tried setting up numPartitions
an optional parameter to provide no of partitions required.

4) your batch interval is 15 seconds but you are caching the static data set
for 30 minutes, what exactly you mean caching for 30 minutes?

Note when you cache data based on the memory pressure there is chance that
partitioning is not preserved. 

it would be useful to provide spark UI screen shots for one complete batch,
the DAG and other details

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-01-30 Thread LongVehicle
Hello everyone,

We are running Spark Streaming jobs in Spark 2.1 in cluster mode in YARN. We
have an RDD (3GB) that we periodically (every 30min) refresh by reading from
HDFS. Namely, we create a DataFrame /df / using /sqlContext.read.parquet/,
and then we create /RDD rdd = df.as[T].rdd/. The first unexpected thing is
that although /df /has parallelism of 90 (because that many HDFS files we
read),
/rdd /has parallelism of 18 (executors X cores = 9 x 2 in our setup). In the
final stage, we repartition the /rdd /using the /HashPartitioner /and the
parallelism of 18 (we denote it as /final_rdd/), and cache it using
MEMORY_ONLY_SER for 30 minutes.
We repartition the rdd using the same key as in partitioning the HDFS files
in the first place. Finally, we /leftOuterJoin /DStream of 9 Kafka
partitions (which are of total size of 300MB) and /final_rdd /(3GB).
This DStream is partitioned by the same (join) key.
Our batch interval size is 15 seconds, and we read new data from Kafka in
each batch.
   
We noticed that /final_rdd /is sometimes (non-deterministically) unevenly
scheduled across executors. And sometimes only 1 or 2 executors are
executing all the tasks.
The problem with uneven assignment is that it persists until we reload HDFS
data (for half an hour).
Why is this happening? We are aware that Spark uses locality when assigning
tasks to executors, but we also tried to set
s/park.shuffle.reduceLocality.enabled=false/.
Unfortunately, this did not help, neither for rdd, nor for the final_rdd.

Any ideas how to address the problem?

Many thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org