Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Gerard Maas
Thanks for the feedback. Cassandra does not seem to be the issue. The time for writing to Cassandra is in the same order of magnitude (see below) The code structure is roughly as follows: dstream.filter(pred).foreachRDD{rdd => val sparkT0 = currentTimeMs val metrics =

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Cody Koeninger
When you say that the largest difference is from metrics.collect, how are you measuring that? Wouldn't that be the difference between max(partitionT1) and sparkT1, not sparkT0 and sparkT1? As for further places to look, what's happening in the logs during that time? Are the number of messages

RE: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Goodall, Mark (UK)
users Subject: Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka *** WARNING *** This message originates from outside our organisation, either from an external partner or the internet. Consider carefully whether you should click on any links, open any attachments or reply

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi Cody, The job is doing ETL from Kafka records to Cassandra. After a single filtering stage on Spark, the 'TL' part is done using the dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern. We have metrics on the executor work which we collect and add together, indicated here by 'local

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
Can you say anything more about what the job is doing? First thing I'd do is try to get some metrics on the time taken by your code on the executors (e.g. when processing the iterator) to see if it's consistent between the two situations. On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
I'm not clear on what you're measuring. Can you post relevant code snippets including the measurement code? As far as kafka metrics, nothing currently. There is an info-level log message every time a kafka rdd iterator is instantiated, log.info(s"Computing topic ${part.topic}, partition

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Jeff Nadler
Gerard - any chance this is related to task locality waiting?Can you try (just as a diagnostic) something like this, does the unexpected delay go away? .set("spark.locality.wait", "0") On Tue, Oct 6, 2015 at 12:00 PM, Gerard Maas wrote: > Hi Cody, > > The job is

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
I agree getting cassandra out of the picture is a good first step. But if you just do foreachRDD { _.count } recent versions of direct stream shouldn't do any work at all on the executor (since the number of messages in the rdd is known already) do a foreachPartition and println or count the

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Tathagata Das
Good point! On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger wrote: > I agree getting cassandra out of the picture is a good first step. > > But if you just do foreachRDD { _.count } recent versions of direct stream > shouldn't do any work at all on the executor (since the

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Tathagata Das
Are sure that this is not related to Cassandra inserts? Could you just do foreachRDD { _.count } instead to keep Cassandra out of the picture and then test this agian. On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase wrote: > Also check if the Kafka cluster is still balanced.