Good point!

On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I agree getting cassandra out of the picture is a good first step.
>
> But if you just do foreachRDD { _.count } recent versions of direct stream
> shouldn't do any work at all on the executor (since the number of messages
> in the rdd is known already)
>
> do a foreachPartition and println or count the iterator manually.
>
> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Are sure that this is not related to Cassandra inserts? Could you just do
>> foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>> then test this agian.
>>
>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>> brokers manages too many partitions, all the work will stay on that
>>> executor unless you repartition right after kakfka (and I'm not saying you
>>> should).
>>>
>>> Sent from my iPhone
>>>
>>> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>>>
>>> I'm not clear on what you're measuring.  Can you post relevant code
>>> snippets including the measurement code?
>>>
>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>> message every time a kafka rdd iterator is instantiated,
>>>
>>>     log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> If you log once you're done with an iterator you should be able to see
>>> the delta.
>>>
>>> The other thing to try is reduce the number of parts involved in the job
>>> to isolate it ... first thing I'd do there is take cassandra out of the
>>> equation.
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>
>>>> We have metrics on the executor work which we collect and add together,
>>>> indicated here by 'local computation'.  As you can see, we also measure how
>>>> much it cost us to measure :-)
>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>> task scheduling and consuming the data from Kafka which becomes part of the
>>>> 'spark computation' part.
>>>>
>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>
>>>> Are there metrics available somehow on the Kafka reading time?
>>>>
>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>>> records processed 4297 5002
>>>>
>>>> (time in ms)
>>>>
>>>> kr, Gerard.
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Can you say anything more about what the job is doing?
>>>>>
>>>>> First thing I'd do is try to get some metrics on the time taken by
>>>>> your code on the executors (e.g. when processing the iterator) to see if
>>>>> it's consistent between the two situations.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>>> Our initial migration went quite fine but now we are seeing a weird 
>>>>>> zig-zag
>>>>>> performance pattern we cannot explain.
>>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>>> the next takes 7sec for a stable streaming rate.
>>>>>>
>>>>>> Here are comparable metrics for two successive tasks:
>>>>>> *Slow*:
>>>>>>
>>>>>>
>>>>>> ​
>>>>>>
>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>> dnode-3.hdfs.private:36863 22 s 3 0 3
>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 40
>>>>>> s 11 0 11 20151006-044141-2408867082-5050-21047-S4
>>>>>> dnode-5.hdfs.private:59945 49 s 10 0 10
>>>>>> *Fast*:
>>>>>>
>>>>>> ​
>>>>>>
>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>> dnode-3.hdfs.private:36863 0.6 s 4 0 4
>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 1
>>>>>> s 9 0 9 20151006-044141-2408867082-5050-21047-S4
>>>>>> dnode-5.hdfs.private:59945 1 s 11 0 11
>>>>>> We have some custom metrics that measure wall-clock time of execution
>>>>>> of certain blocks of the job, like the time it takes to do the local
>>>>>> computations (RDD.foreachPartition closure) vs total time.
>>>>>> The difference between the slow and fast executing task is on the
>>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>>> (DStream.foreachRDD closure)
>>>>>>
>>>>>> e.g.
>>>>>> Slow task:
>>>>>>
>>>>>> local computation time: 347.60968499999996, *spark computation time:
>>>>>> 6930*, metric collection: 70, total process: 7000, total_records:
>>>>>> 4297
>>>>>>
>>>>>> Fast task:
>>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>>
>>>>>> We are currently running Spark 1.4.1. The load and the work to be
>>>>>> done is stable -this is on a dev env with that stuff under control.
>>>>>>
>>>>>> Any ideas what this behavior could be?
>>>>>>
>>>>>> thanks in advance,  Gerard.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to