Hi Vijay, Thank you very much for your reply. Setting the number of partitions explicitly in the join, and memory pressure influence on partitioning were definitely very good insights.
At the end, we avoid the issue of uneven load balancing completely by doing the following two: a) Reducing the number of executors, and increasing the number of cores and executor memory b) Increasing the batch interval size from 15s to 30s. Here is a nice blog post that explains how to improve performance for Spark jobs in general: https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/ . @Vijay: And here are the responses to your questions: 1) Correct. 2) This is exactly what confuses us: There is nothing between the following lines: df = sqlContext.read.parquet(...) and RDD rdd = df.as[T].rdd We saw that a separate query plan is executed on converting DataFrame to RDD (.rdd method). Is it equivalent to repartition, coalesce or something else? 3) Exactly. 4) We are caching the static rdd for 30 minutes. That is, we have a trait with readLast method that returns the last read RDD, and once the RDD is more than 30 minutes old, we reload its content from disk using df = sqlContext.read.parquet(...). ------- My final question is the following: What would be the most efficient way (including possibly an external key-value store) for efficient store, update and retrieval of final_rdd? The state may grow beyond 3GB, and we want to maintain our scalability and latency. In fact, we have many Spark jobs that join the same RDD with different Kafka streams. Thank you very much! On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvp <bvpsa...@gmail.com> wrote: > Summarizing > > 1) Static data set read from Parquet files as DataFrame in HDFS has initial > parallelism of 90 (based on no input files) > > 2) static data set DataFrame is converted as rdd, and rdd has parallelism > of > 18 this was not expected > dataframe.rdd is lazy evaluation there must be some operation you were > doing > that would have triggered > conversion from 90 to 18, this would be some operation that breaks > stage/requires shuffling such as groupby, reduceby, repartition,coalesce > if you are using coalesce, the second parameter shuff is by default false > which means upstream parallelism is not preserved. > > 3) you have DStream of Kafka source with 9 partitions this is joined with > above static data set? when joining have you tried setting up numPartitions > an optional parameter to provide no of partitions required. > > 4) your batch interval is 15 seconds but you are caching the static data > set > for 30 minutes, what exactly you mean caching for 30 minutes? > > Note when you cache data based on the memory pressure there is chance that > partitioning is not preserved. > > it would be useful to provide spark UI screen shots for one complete batch, > the DAG and other details > > thanks > Vijay > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >