Good point! On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <c...@koeninger.org> wrote:
> I agree getting cassandra out of the picture is a good first step. > > But if you just do foreachRDD { _.count } recent versions of direct stream > shouldn't do any work at all on the executor (since the number of messages > in the rdd is known already) > > do a foreachPartition and println or count the iterator manually. > > On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <t...@databricks.com> wrote: > >> Are sure that this is not related to Cassandra inserts? Could you just do >> foreachRDD { _.count } instead to keep Cassandra out of the picture and >> then test this agian. >> >> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> Also check if the Kafka cluster is still balanced. Maybe one of the >>> brokers manages too many partitions, all the work will stay on that >>> executor unless you repartition right after kakfka (and I'm not saying you >>> should). >>> >>> Sent from my iPhone >>> >>> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote: >>> >>> I'm not clear on what you're measuring. Can you post relevant code >>> snippets including the measurement code? >>> >>> As far as kafka metrics, nothing currently. There is an info-level log >>> message every time a kafka rdd iterator is instantiated, >>> >>> log.info(s"Computing topic ${part.topic}, partition >>> ${part.partition} " + >>> >>> s"offsets ${part.fromOffset} -> ${part.untilOffset}") >>> >>> >>> If you log once you're done with an iterator you should be able to see >>> the delta. >>> >>> The other thing to try is reduce the number of parts involved in the job >>> to isolate it ... first thing I'd do there is take cassandra out of the >>> equation. >>> >>> >>> >>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com> >>> wrote: >>> >>>> Hi Cody, >>>> >>>> The job is doing ETL from Kafka records to Cassandra. After a >>>> single filtering stage on Spark, the 'TL' part is done using the >>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern. >>>> >>>> We have metrics on the executor work which we collect and add together, >>>> indicated here by 'local computation'. As you can see, we also measure how >>>> much it cost us to measure :-) >>>> See how 'local work' times are comparable. What's not visible is the >>>> task scheduling and consuming the data from Kafka which becomes part of the >>>> 'spark computation' part. >>>> >>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag... >>>> >>>> Are there metrics available somehow on the Kafka reading time? >>>> >>>> Slow Task Fast Task local computation 347.6 281.53 spark computation >>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total >>>> records processed 4297 5002 >>>> >>>> (time in ms) >>>> >>>> kr, Gerard. >>>> >>>> >>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Can you say anything more about what the job is doing? >>>>> >>>>> First thing I'd do is try to get some metrics on the time taken by >>>>> your code on the executors (e.g. when processing the iterator) to see if >>>>> it's consistent between the two situations. >>>>> >>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We recently migrated our streaming jobs to the direct kafka receiver. >>>>>> Our initial migration went quite fine but now we are seeing a weird >>>>>> zig-zag >>>>>> performance pattern we cannot explain. >>>>>> In alternating fashion, one task takes about 1 second to finish and >>>>>> the next takes 7sec for a stable streaming rate. >>>>>> >>>>>> Here are comparable metrics for two successive tasks: >>>>>> *Slow*: >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded >>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0 >>>>>> dnode-3.hdfs.private:36863 22 s 3 0 3 >>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 40 >>>>>> s 11 0 11 20151006-044141-2408867082-5050-21047-S4 >>>>>> dnode-5.hdfs.private:59945 49 s 10 0 10 >>>>>> *Fast*: >>>>>> >>>>>> >>>>>> >>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded >>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0 >>>>>> dnode-3.hdfs.private:36863 0.6 s 4 0 4 >>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 1 >>>>>> s 9 0 9 20151006-044141-2408867082-5050-21047-S4 >>>>>> dnode-5.hdfs.private:59945 1 s 11 0 11 >>>>>> We have some custom metrics that measure wall-clock time of execution >>>>>> of certain blocks of the job, like the time it takes to do the local >>>>>> computations (RDD.foreachPartition closure) vs total time. >>>>>> The difference between the slow and fast executing task is on the >>>>>> 'spark computation time' which is wall-clock for the task scheduling >>>>>> (DStream.foreachRDD closure) >>>>>> >>>>>> e.g. >>>>>> Slow task: >>>>>> >>>>>> local computation time: 347.60968499999996, *spark computation time: >>>>>> 6930*, metric collection: 70, total process: 7000, total_records: >>>>>> 4297 >>>>>> >>>>>> Fast task: >>>>>> local computation time: 281.539042,* spark computation time: 263*, >>>>>> metric collection: 138, total process: 401, total_records: 5002 >>>>>> >>>>>> We are currently running Spark 1.4.1. The load and the work to be >>>>>> done is stable -this is on a dev env with that stuff under control. >>>>>> >>>>>> Any ideas what this behavior could be? >>>>>> >>>>>> thanks in advance, Gerard. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >