Sorry, more relevant code below:

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);
jssc.start();
jssc.awaitTermination();
jssc.close();
……………………………..
  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
    JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
      @Override
      public JavaStreamingContext create() {
        return createContext(sparkConf, params);
      }
    };
    return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
    // Create context with the specified batch interval, in milliseconds.
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
    // Set the checkpoint directory, if we're checkpointing
    if (params.isCheckpointed()) {
      jssc.checkpoint(params.getCheckpointDir());
    }

    Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
.getTopic()));

    // Set the Kafka parameters.
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());
    if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
      kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());
    }

    // Create direct Kafka stream with the brokers and the topic.
    JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
      jssc,
      String.class,
      String.class,
      StringDecoder.class,
      StringDecoder.class,
      kafkaParams,
      topicsSet);

    // See if there's an override of the default checkpoint duration.
    if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
      messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));
    }

    JavaDStream<String> messageBodies = messages.map(new
Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        ProcessPartitionFunction func = new
ProcessPartitionFunction(params);
        rdd.foreachPartition(func);
        return null;
      }
    });

    return jssc;
}

On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com>
wrote:

> I'd think that we wouldn't be "accidentally recovering from checkpoint"
> hours or even days after consumers have been restarted, plus the content is
> the fresh content that I'm feeding, not some content that had been fed
> before the last restart.
>
> The code is basically as follows:
>
>     SparkConf sparkConf = createSparkConf(...);
>     // We'd be 'checkpointed' because we specify a checkpoint directory
> which makes isCheckpointed true
>     JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);    jssc.start();
>
>     jssc.awaitTermination();
>
>     jssc.close();
>
>
>
> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Are you sure you are not accidentally recovering from checkpoint? How are
>> you using StreamingContext.getOrCreate() in your code?
>>
>> TD
>>
>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Tathagata,
>>>
>>> In our logs I see the batch duration millis being set first to 10 then
>>> to 20 seconds. I don't see the 20 being reflected later during ingestion.
>>>
>>> In the Spark UI under Streaming I see the below output, notice the *10
>>> second* Batch interval.  Can you think of a reason why it's stuck at
>>> 10?  It used to be 1 second by the way, then somehow over the course of a
>>> few restarts we managed to get it to be 10 seconds.  Now it won't get reset
>>> to 20 seconds.  Any ideas?
>>>
>>> Streaming
>>>
>>>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>    - *Time since start: *1 day 8 hours 44 minutes
>>>    - *Network receivers: *0
>>>    - *Batch interval: *10 seconds
>>>    - *Processed batches: *11790
>>>    - *Waiting batches: *0
>>>    - *Received records: *0
>>>    - *Processed records: *0
>>>
>>>
>>>
>>> Statistics over last 100 processed batchesReceiver Statistics
>>> No receivers
>>> Batch Processing Statistics
>>>
>>>    MetricLast batchMinimum25th percentileMedian75th 
>>> percentileMaximumProcessing
>>>    Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1
>>>    ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Could you see what the streaming tab in the Spark UI says? It should
>>>> show the underlying batch duration of the StreamingContext, the details of
>>>> when the batch starts, etc.
>>>>
>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when
>>>> data is present (that is, * Documents processed: > 0)*
>>>>
>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> Tathagata,
>>>>>
>>>>> Checkpointing is turned on but we were not recovering. I'm looking at
>>>>> the logs now, feeding fresh content hours after the restart. Here's a
>>>>> snippet:
>>>>>
>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>>>> ...
>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>>>
>>>>> Interestingly, the fresh content (4 documents) is fed about 5.6
>>>>> seconds after the previous batch, not 10 seconds. The second fresh batch
>>>>> comes in 6.8 seconds after the previous empty one.
>>>>>
>>>>> Granted, the log message is printed after iterating over the messages
>>>>> which may account for some time differences. But generally, looking at the
>>>>> log messages being printed before we iterate, it's still 10 seconds each
>>>>> time, not 20 which is what batchdurationmillis is currently set to.
>>>>>
>>>>> Code:
>>>>>
>>>>> JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(....);
>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>
>>>>>
>>>>>   JavaDStream<String> messageBodies = messages.map(new 
>>>>> Function<Tuple2<String,
>>>>> String>, String>() {
>>>>>       @Override
>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>         return tuple2._2();
>>>>>       }
>>>>>     });
>>>>>
>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
>>>>>       @Override
>>>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>>>
>>>>>   ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>>>>>         rdd.foreachPartition(func);
>>>>>         return null;
>>>>>       }
>>>>>     });
>>>>>
>>>>> The log message comes from ProcessPartitionFunction:
>>>>>
>>>>> public void call(Iterator<String> messageIterator) throws Exception {
>>>>>     log.info("Starting data partition processing. AppName={},
>>>>> topic={}.)...", appName, topic);
>>>>>     // ... iterate ...
>>>>>     log.info("Finished data partition processing (appName={},
>>>>> topic={}). Documents processed: {}.", appName, topic, docCount);
>>>>> }
>>>>>
>>>>> Any ideas? Thanks.
>>>>>
>>>>> - Dmitry
>>>>>
>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Are you accidentally recovering from checkpoint files which has 10
>>>>>> second as the batch interval?
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm seeing an oddity where I initially set the batchdurationmillis
>>>>>>> to 1 second and it works fine:
>>>>>>>
>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>>> Durations.milliseconds(batchDurationMillis));
>>>>>>>
>>>>>>> Then I tried changing the value to 10 seconds. The change didn't
>>>>>>> seem to take. I've bounced the Spark workers and the consumers and now 
>>>>>>> I'm
>>>>>>> seeing RDD's coming in once around 10 seconds (not always 10 seconds
>>>>>>> according to the logs).
>>>>>>>
>>>>>>> However, now I'm trying to change the value to 20 seconds and it's
>>>>>>> just not taking. I've bounced Spark master, workers, and consumers and 
>>>>>>> the
>>>>>>> value seems "stuck" at 10 seconds.
>>>>>>>
>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> - Dmitry
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to