Gerard - any chance this is related to task locality waiting? Can you try (just as a diagnostic) something like this, does the unexpected delay go away?
.set("spark.locality.wait", "0") On Tue, Oct 6, 2015 at 12:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi Cody, > > The job is doing ETL from Kafka records to Cassandra. After a > single filtering stage on Spark, the 'TL' part is done using the > dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern. > > We have metrics on the executor work which we collect and add together, > indicated here by 'local computation'. As you can see, we also measure how > much it cost us to measure :-) > See how 'local work' times are comparable. What's not visible is the > task scheduling and consuming the data from Kafka which becomes part of the > 'spark computation' part. > > The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag... > > Are there metrics available somehow on the Kafka reading time? > > Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric > collection70138wall clock process7000401total records processed42975002 > > (time in ms) > > kr, Gerard. > > > On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> Can you say anything more about what the job is doing? >> >> First thing I'd do is try to get some metrics on the time taken by your >> code on the executors (e.g. when processing the iterator) to see if it's >> consistent between the two situations. >> >> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We recently migrated our streaming jobs to the direct kafka receiver. >>> Our initial migration went quite fine but now we are seeing a weird zig-zag >>> performance pattern we cannot explain. >>> In alternating fashion, one task takes about 1 second to finish and the >>> next takes 7sec for a stable streaming rate. >>> >>> Here are comparable metrics for two successive tasks: >>> *Slow*: >>> >>> >>> >>> >>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks >>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30 >>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s >>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 >>> s10010 >>> *Fast*: >>> >>> >>> >>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks >>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4 >>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9 >>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s >>> 11011 >>> We have some custom metrics that measure wall-clock time of execution of >>> certain blocks of the job, like the time it takes to do the local >>> computations (RDD.foreachPartition closure) vs total time. >>> The difference between the slow and fast executing task is on the 'spark >>> computation time' which is wall-clock for the task scheduling >>> (DStream.foreachRDD closure) >>> >>> e.g. >>> Slow task: >>> >>> local computation time: 347.60968499999996, *spark computation time: >>> 6930*, metric collection: 70, total process: 7000, total_records: 4297 >>> >>> Fast task: >>> local computation time: 281.539042,* spark computation time: 263*, >>> metric collection: 138, total process: 401, total_records: 5002 >>> >>> We are currently running Spark 1.4.1. The load and the work to be done >>> is stable -this is on a dev env with that stuff under control. >>> >>> Any ideas what this behavior could be? >>> >>> thanks in advance, Gerard. >>> >>> >>> >>> >>> >>> >>> >> >