I detected the error. The final step is to index data in ElasticSearch, The
elasticSearch in one of the cluster is overhelmed and it doesn't work
correctly.
I linked the cluster which doesn't work with another ES and don't get any
delay.

Sorry,  it wasn't relationed with Spark!




2015-07-31 9:15 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:

> It doesn't make sense to me. Because in the another cluster process all
> data in less than a second.
> Anyway, I'm going to set that parameter.
>
> 2015-07-31 0:36 GMT+02:00 Tathagata Das <t...@databricks.com>:
>
>> Yes, and that is indeed the problem. It is trying to process all the data
>> in Kafka, and therefore taking 60 seconds. You need to set the rate limits
>> for that.
>>
>> On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> If you don't set it, there is no maximum rate, it will get everything
>>> from the end of the last batch to the maximum available offset
>>>
>>> On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <konstt2...@gmail.com>
>>> wrote:
>>>
>>>> The difference is that one recives more data than the others two. I can
>>>> pass thought parameters the topics, so, I could execute the code trying
>>>> with one topic and figure out with one is the topic, although I guess that
>>>> it's the topics which gets more data.
>>>>
>>>> Anyway it's pretty weird those delays in just one of the cluster even
>>>> if the another one is not running.
>>>> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition",
>>>> I haven't set any value for this parameter, how does it work if this
>>>> parameter doesn't have a value?
>>>>
>>>> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>
>>>>> If the jobs are running on different topicpartitions, what's different
>>>>> about them?  Is one of them 120x the throughput of the other, for
>>>>> instance?  You should be able to eliminate cluster config as a difference
>>>>> by running the same topic partition on the different clusters and 
>>>>> comparing
>>>>> the results.
>>>>>
>>>>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <konstt2...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I have three topics with one partition each topic. So each jobs run
>>>>>> about one topics.
>>>>>>
>>>>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>
>>>>>>> Just so I'm clear, the difference in timing you're talking about is
>>>>>>> this:
>>>>>>>
>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>>
>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>>
>>>>>>>
>>>>>>> Are those jobs running on the same topicpartition?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <
>>>>>>> konstt2...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>>>>> parameter. Could it be the problem?? Although this wouldn't explain 
>>>>>>>> why it
>>>>>>>> doesn't work in one of the clusters.
>>>>>>>>
>>>>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>>>>>>>
>>>>>>>>> They just share the kafka, the rest of resources are independents.
>>>>>>>>> I tried to stop one cluster and execute just the cluster isn't 
>>>>>>>>> working but
>>>>>>>>> it happens the same.
>>>>>>>>>
>>>>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <konstt2...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I have some problem with the JobScheduler. I have executed same
>>>>>>>>>> code in two cluster. I read from three topics in Kafka with 
>>>>>>>>>> DirectStream so
>>>>>>>>>> I have three tasks.
>>>>>>>>>>
>>>>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>>>>
>>>>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>>>>
>>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_24_piece0 in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, 
>>>>>>>>>> free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_24_piece0 in memory on xxxxxxxxx:43477 (size: 1802.0 B, 
>>>>>>>>>> free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259580000 ms*
>>>>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259585000 ms*
>>>>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259590000 ms*
>>>>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259595000 ms*
>>>>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259600000 ms*
>>>>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259605000 ms*
>>>>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259610000 ms*
>>>>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259615000 ms*
>>>>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259620000 ms*
>>>>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259625000 ms*
>>>>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259630000 ms*
>>>>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259635000 ms*
>>>>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>>>>> tasks have all completed, from pool
>>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD
>>>>>>>>>> at MetricsSpark.scala:67, took 60.391761 s
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>>>>
>>>>>>>>>> There are *always *a minute of delay in the third task, when I
>>>>>>>>>> have executed same code in another cluster there isn't this delay in 
>>>>>>>>>> the
>>>>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters 
>>>>>>>>>> and it
>>>>>>>>>> seems the same.
>>>>>>>>>>
>>>>>>>>>> The log in the cluster is working good is
>>>>>>>>>>
>>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>>>>> tasks
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_93_piece0 in memory on xxxxxxxxxxxxxxxxx:45132 (size: 
>>>>>>>>>> 1801.0 B,
>>>>>>>>>> free: 530.3 MB)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_93_piece0 in memory on xxxxxxxxx:49886 (size: 1801.0 B, 
>>>>>>>>>> free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>>>>> tasks have all completed, from pool
>>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD
>>>>>>>>>> at MetricsSpark.scala:67, took 0.531323 s
>>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for
>>>>>>>>>> time 1438259855000 ms (execution: 0.540 s)
>>>>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from
>>>>>>>>>> persistence list
>>>>>>>>>>
>>>>>>>>>> Any clue about where I could take a look? Number of cpus in YARN
>>>>>>>>>> is enough. I executing YARN with same options (--master yarn-server 
>>>>>>>>>> with 1g
>>>>>>>>>> of memory in both)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to