Are you sure you are not accidentally recovering from checkpoint? How are
you using StreamingContext.getOrCreate() in your code?

TD

On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com>
wrote:

> Tathagata,
>
> In our logs I see the batch duration millis being set first to 10 then to
> 20 seconds. I don't see the 20 being reflected later during ingestion.
>
> In the Spark UI under Streaming I see the below output, notice the *10
> second* Batch interval.  Can you think of a reason why it's stuck at 10?
> It used to be 1 second by the way, then somehow over the course of a few
> restarts we managed to get it to be 10 seconds.  Now it won't get reset to
> 20 seconds.  Any ideas?
>
> Streaming
>
>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>    - *Time since start: *1 day 8 hours 44 minutes
>    - *Network receivers: *0
>    - *Batch interval: *10 seconds
>    - *Processed batches: *11790
>    - *Waiting batches: *0
>    - *Received records: *0
>    - *Processed records: *0
>
>
>
> Statistics over last 100 processed batchesReceiver Statistics
> No receivers
> Batch Processing Statistics
>
>    MetricLast batchMinimum25th percentileMedian75th 
> percentileMaximumProcessing
>    Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 ms2
>    msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms
>
>
>
>
>
> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Could you see what the streaming tab in the Spark UI says? It should show
>> the underlying batch duration of the StreamingContext, the details of when
>> the batch starts, etc.
>>
>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data
>> is present (that is, * Documents processed: > 0)*
>>
>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Tathagata,
>>>
>>> Checkpointing is turned on but we were not recovering. I'm looking at
>>> the logs now, feeding fresh content hours after the restart. Here's a
>>> snippet:
>>>
>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>> ...
>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>
>>> Interestingly, the fresh content (4 documents) is fed about 5.6 seconds
>>> after the previous batch, not 10 seconds. The second fresh batch comes in
>>> 6.8 seconds after the previous empty one.
>>>
>>> Granted, the log message is printed after iterating over the messages
>>> which may account for some time differences. But generally, looking at the
>>> log messages being printed before we iterate, it's still 10 seconds each
>>> time, not 20 which is what batchdurationmillis is currently set to.
>>>
>>> Code:
>>>
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(....);
>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>
>>>
>>>   JavaDStream<String> messageBodies = messages.map(new 
>>> Function<Tuple2<String,
>>> String>, String>() {
>>>       @Override
>>>       public String call(Tuple2<String, String> tuple2) {
>>>         return tuple2._2();
>>>       }
>>>     });
>>>
>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
>>>       @Override
>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>
>>>   ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>>>         rdd.foreachPartition(func);
>>>         return null;
>>>       }
>>>     });
>>>
>>> The log message comes from ProcessPartitionFunction:
>>>
>>> public void call(Iterator<String> messageIterator) throws Exception {
>>>     log.info("Starting data partition processing. AppName={},
>>> topic={}.)...", appName, topic);
>>>     // ... iterate ...
>>>     log.info("Finished data partition processing (appName={},
>>> topic={}). Documents processed: {}.", appName, topic, docCount);
>>> }
>>>
>>> Any ideas? Thanks.
>>>
>>> - Dmitry
>>>
>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Are you accidentally recovering from checkpoint files which has 10
>>>> second as the batch interval?
>>>>
>>>>
>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> I'm seeing an oddity where I initially set the batchdurationmillis to
>>>>> 1 second and it works fine:
>>>>>
>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>> Durations.milliseconds(batchDurationMillis));
>>>>>
>>>>> Then I tried changing the value to 10 seconds. The change didn't seem
>>>>> to take. I've bounced the Spark workers and the consumers and now I'm
>>>>> seeing RDD's coming in once around 10 seconds (not always 10 seconds
>>>>> according to the logs).
>>>>>
>>>>> However, now I'm trying to change the value to 20 seconds and it's
>>>>> just not taking. I've bounced Spark master, workers, and consumers and the
>>>>> value seems "stuck" at 10 seconds.
>>>>>
>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> - Dmitry
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to