Hi Vijay,

Thank you very much for your reply. Setting the number of partitions
explicitly in the join, and memory pressure influence on partitioning were
definitely very good insights.

At the end, we avoid the issue of uneven load balancing completely by doing
the following two:
a) Reducing the number of executors, and increasing the number of cores and
executor memory
b) Increasing the batch interval size from 15s to 30s.

Here is a nice blog post that explains how to improve performance for Spark
jobs in general:
https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/
.

@Vijay: And here are the responses to your questions:
1) Correct.

2) This is exactly what confuses us: There is nothing between the following
lines:
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd

We saw that a separate query plan is executed on converting DataFrame to
RDD (.rdd method). Is it equivalent to repartition, coalesce or something
else?

3) Exactly.

4) We are caching the static rdd for 30 minutes. That is, we have a trait
with readLast method that returns the last read RDD, and once the RDD is
more than 30 minutes old, we reload its content from disk using  df =
sqlContext.read.parquet(...).

-------

My final question is the following: What would be the most efficient way
(including possibly an external key-value store) for efficient store,
update and retrieval of final_rdd? The state may grow beyond 3GB, and we
want to maintain our scalability and latency. In fact, we have many Spark
jobs that join the same RDD with different Kafka streams.

Thank you very much!

On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvp <bvpsa...@gmail.com> wrote:

> Summarizing
>
> 1) Static data set read from Parquet files as DataFrame in HDFS has initial
> parallelism of 90 (based on no input files)
>
> 2) static data set DataFrame is converted as rdd, and rdd has parallelism
> of
> 18 this was not expected
> dataframe.rdd is lazy evaluation there must be some operation you were
> doing
> that would have triggered
> conversion from 90 to 18, this would be some operation that breaks
> stage/requires shuffling such as groupby, reduceby, repartition,coalesce
> if you are using coalesce, the second parameter shuff is by default false
> which means upstream parallelism is not preserved.
>
> 3) you have DStream of Kafka source with 9 partitions this is joined with
> above static data set? when joining have you tried setting up numPartitions
> an optional parameter to provide no of partitions required.
>
> 4) your batch interval is 15 seconds but you are caching the static data
> set
> for 30 minutes, what exactly you mean caching for 30 minutes?
>
> Note when you cache data based on the memory pressure there is chance that
> partitioning is not preserved.
>
> it would be useful to provide spark UI screen shots for one complete batch,
> the DAG and other details
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to