Gerard - any chance this is related to task locality waiting?    Can you
try (just as a diagnostic) something like this, does the unexpected delay
go away?

.set("spark.locality.wait", "0")


On Tue, Oct 6, 2015 at 12:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add together,
> indicated here by 'local computation'.  As you can see, we also measure how
> much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
> collection70138wall clock process7000401total records processed42975002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by your
>> code on the executors (e.g. when processing the iterator) to see if it's
>> consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>> performance pattern we cannot explain.
>>> In alternating fashion, one task takes about 1 second to finish and the
>>> next takes 7sec for a stable streaming rate.
>>>
>>> Here are comparable metrics for two successive tasks:
>>> *Slow*:
>>>
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30
>>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s
>>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549
>>> s10010
>>> *Fast*:
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4
>>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9
>>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s
>>> 11011
>>> We have some custom metrics that measure wall-clock time of execution of
>>> certain blocks of the job, like the time it takes to do the local
>>> computations (RDD.foreachPartition closure) vs total time.
>>> The difference between the slow and fast executing task is on the 'spark
>>> computation time' which is wall-clock for the task scheduling
>>> (DStream.foreachRDD closure)
>>>
>>> e.g.
>>> Slow task:
>>>
>>> local computation time: 347.60968499999996, *spark computation time:
>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>
>>> Fast task:
>>> local computation time: 281.539042,* spark computation time: 263*,
>>> metric collection: 138, total process: 401, total_records: 5002
>>>
>>> We are currently running Spark 1.4.1. The load and the work to be done
>>> is stable -this is on a dev env with that stuff under control.
>>>
>>> Any ideas what this behavior could be?
>>>
>>> thanks in advance,  Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to