Tathagata,

Checkpointing is turned on but we were not recovering. I'm looking at the
logs now, feeding fresh content hours after the restart. Here's a snippet:

2015-09-04 06:11:20,013 ... Documents processed: 0.
2015-09-04 06:11:30,014 ... Documents processed: 0.
2015-09-04 06:11:40,011 ... Documents processed: 0.
2015-09-04 06:11:50,012 ... Documents processed: 0.
2015-09-04 06:12:00,010 ... Documents processed: 0.
2015-09-04 06:12:10,047 ... Documents processed: 0.
2015-09-04 06:12:20,012 ... Documents processed: 0.
2015-09-04 06:12:30,011 ... Documents processed: 0.
2015-09-04 06:12:40,012 ... Documents processed: 0.
*2015-09-04 06:12:55,629 ... Documents processed: 4.*
2015-09-04 06:13:00,018 ... Documents processed: 0.
2015-09-04 06:13:10,012 ... Documents processed: 0.
2015-09-04 06:13:20,019 ... Documents processed: 0.
2015-09-04 06:13:30,014 ... Documents processed: 0.
2015-09-04 06:13:40,041 ... Documents processed: 0.
2015-09-04 06:13:50,009 ... Documents processed: 0.
...
2015-09-04 06:17:30,019 ... Documents processed: 0.
*2015-09-04 06:17:46,832 ... Documents processed: 40.*

Interestingly, the fresh content (4 documents) is fed about 5.6 seconds
after the previous batch, not 10 seconds. The second fresh batch comes in
6.8 seconds after the previous empty one.

Granted, the log message is printed after iterating over the messages which
may account for some time differences. But generally, looking at the log
messages being printed before we iterate, it's still 10 seconds each time,
not 20 which is what batchdurationmillis is currently set to.

Code:

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(....);
messages.checkpoint(Durations.milliseconds(checkpointMillis));


  JavaDStream<String> messageBodies = messages.map(new Function<Tuple2<String,
String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new ProcessPartitionFunction(...);
        rdd.foreachPartition(func);
        return null;
      }
    });

The log message comes from ProcessPartitionFunction:

public void call(Iterator<String> messageIterator) throws Exception {
    log.info("Starting data partition processing. AppName={}, topic={}.)...",
appName, topic);
    // ... iterate ...
    log.info("Finished data partition processing (appName={}, topic={}).
Documents processed: {}.", appName, topic, docCount);
}

Any ideas? Thanks.

- Dmitry

On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com> wrote:

> Are you accidentally recovering from checkpoint files which has 10 second
> as the batch interval?
>
>
> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I'm seeing an oddity where I initially set the batchdurationmillis to 1
>> second and it works fine:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(batchDurationMillis));
>>
>> Then I tried changing the value to 10 seconds. The change didn't seem to
>> take. I've bounced the Spark workers and the consumers and now I'm seeing
>> RDD's coming in once around 10 seconds (not always 10 seconds according to
>> the logs).
>>
>> However, now I'm trying to change the value to 20 seconds and it's just
>> not taking. I've bounced Spark master, workers, and consumers and the value
>> seems "stuck" at 10 seconds.
>>
>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>
>> Thanks.
>>
>> - Dmitry
>>
>>
>

Reply via email to