Are sure that this is not related to Cassandra inserts? Could you just do
foreachRDD { _.count } instead  to keep Cassandra out of the picture and
then test this agian.

On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Also check if the Kafka cluster is still balanced. Maybe one of the
> brokers manages too many partitions, all the work will stay on that
> executor unless you repartition right after kakfka (and I'm not saying you
> should).
>
> Sent from my iPhone
>
> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>
> I'm not clear on what you're measuring.  Can you post relevant code
> snippets including the measurement code?
>
> As far as kafka metrics, nothing currently.  There is an info-level log
> message every time a kafka rdd iterator is instantiated,
>
>     log.info(s"Computing topic ${part.topic}, partition ${part.partition}
> " +
>
>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> If you log once you're done with an iterator you should be able to see the
> delta.
>
> The other thing to try is reduce the number of parts involved in the job
> to isolate it ... first thing I'd do there is take cassandra out of the
> equation.
>
>
>
> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> The job is doing ETL from Kafka records to Cassandra. After a
>> single filtering stage on Spark, the 'TL' part is done using the
>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>
>> We have metrics on the executor work which we collect and add together,
>> indicated here by 'local computation'.  As you can see, we also measure how
>> much it cost us to measure :-)
>> See how 'local work'  times are comparable.  What's not visible is the
>> task scheduling and consuming the data from Kafka which becomes part of the
>> 'spark computation' part.
>>
>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>
>> Are there metrics available somehow on the Kafka reading time?
>>
>> Slow Task Fast Task local computation 347.6 281.53 spark computation 6930
>> 263 metric collection 70 138 wall clock process 7000 401 total records
>> processed 4297 5002
>>
>> (time in ms)
>>
>> kr, Gerard.
>>
>>
>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Can you say anything more about what the job is doing?
>>>
>>> First thing I'd do is try to get some metrics on the time taken by your
>>> code on the executors (e.g. when processing the iterator) to see if it's
>>> consistent between the two situations.
>>>
>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>>> performance pattern we cannot explain.
>>>> In alternating fashion, one task takes about 1 second to finish and the
>>>> next takes 7sec for a stable streaming rate.
>>>>
>>>> Here are comparable metrics for two successive tasks:
>>>> *Slow*:
>>>>
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>> s 10 0 10
>>>> *Fast*:
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6
>>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 1 s 9 0 9
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1 s
>>>> 11 0 11
>>>> We have some custom metrics that measure wall-clock time of execution
>>>> of certain blocks of the job, like the time it takes to do the local
>>>> computations (RDD.foreachPartition closure) vs total time.
>>>> The difference between the slow and fast executing task is on the
>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>> (DStream.foreachRDD closure)
>>>>
>>>> e.g.
>>>> Slow task:
>>>>
>>>> local computation time: 347.60968499999996, *spark computation time:
>>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>>
>>>> Fast task:
>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>
>>>> We are currently running Spark 1.4.1. The load and the work to be done
>>>> is stable -this is on a dev env with that stuff under control.
>>>>
>>>> Any ideas what this behavior could be?
>>>>
>>>> thanks in advance,  Gerard.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to