Are sure that this is not related to Cassandra inserts? Could you just do foreachRDD { _.count } instead to keep Cassandra out of the picture and then test this agian.
On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote: > Also check if the Kafka cluster is still balanced. Maybe one of the > brokers manages too many partitions, all the work will stay on that > executor unless you repartition right after kakfka (and I'm not saying you > should). > > Sent from my iPhone > > On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote: > > I'm not clear on what you're measuring. Can you post relevant code > snippets including the measurement code? > > As far as kafka metrics, nothing currently. There is an info-level log > message every time a kafka rdd iterator is instantiated, > > log.info(s"Computing topic ${part.topic}, partition ${part.partition} > " + > > s"offsets ${part.fromOffset} -> ${part.untilOffset}") > > > If you log once you're done with an iterator you should be able to see the > delta. > > The other thing to try is reduce the number of parts involved in the job > to isolate it ... first thing I'd do there is take cassandra out of the > equation. > > > > On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > >> Hi Cody, >> >> The job is doing ETL from Kafka records to Cassandra. After a >> single filtering stage on Spark, the 'TL' part is done using the >> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern. >> >> We have metrics on the executor work which we collect and add together, >> indicated here by 'local computation'. As you can see, we also measure how >> much it cost us to measure :-) >> See how 'local work' times are comparable. What's not visible is the >> task scheduling and consuming the data from Kafka which becomes part of the >> 'spark computation' part. >> >> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag... >> >> Are there metrics available somehow on the Kafka reading time? >> >> Slow Task Fast Task local computation 347.6 281.53 spark computation 6930 >> 263 metric collection 70 138 wall clock process 7000 401 total records >> processed 4297 5002 >> >> (time in ms) >> >> kr, Gerard. >> >> >> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Can you say anything more about what the job is doing? >>> >>> First thing I'd do is try to get some metrics on the time taken by your >>> code on the executors (e.g. when processing the iterator) to see if it's >>> consistent between the two situations. >>> >>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> We recently migrated our streaming jobs to the direct kafka receiver. >>>> Our initial migration went quite fine but now we are seeing a weird zig-zag >>>> performance pattern we cannot explain. >>>> In alternating fashion, one task takes about 1 second to finish and the >>>> next takes 7sec for a stable streaming rate. >>>> >>>> Here are comparable metrics for two successive tasks: >>>> *Slow*: >>>> >>>> >>>> >>>> >>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks >>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22 >>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1 >>>> dnode-0.hdfs.private:43812 40 s 11 0 11 >>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49 >>>> s 10 0 10 >>>> *Fast*: >>>> >>>> >>>> >>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks >>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6 >>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1 >>>> dnode-0.hdfs.private:43812 1 s 9 0 9 >>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1 s >>>> 11 0 11 >>>> We have some custom metrics that measure wall-clock time of execution >>>> of certain blocks of the job, like the time it takes to do the local >>>> computations (RDD.foreachPartition closure) vs total time. >>>> The difference between the slow and fast executing task is on the >>>> 'spark computation time' which is wall-clock for the task scheduling >>>> (DStream.foreachRDD closure) >>>> >>>> e.g. >>>> Slow task: >>>> >>>> local computation time: 347.60968499999996, *spark computation time: >>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297 >>>> >>>> Fast task: >>>> local computation time: 281.539042,* spark computation time: 263*, >>>> metric collection: 138, total process: 401, total_records: 5002 >>>> >>>> We are currently running Spark 1.4.1. The load and the work to be done >>>> is stable -this is on a dev env with that stuff under control. >>>> >>>> Any ideas what this behavior could be? >>>> >>>> thanks in advance, Gerard. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >> >