Sorry, more relevant code below: SparkConf sparkConf = createSparkConf(appName, kahunaEnv); JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params); jssc.start(); jssc.awaitTermination(); jssc.close(); …………………………….. private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf, Parameters params) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(sparkConf, params); } }; return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), factory); }
private JavaStreamingContext createContext(SparkConf sparkConf, Parameters params) { // Create context with the specified batch interval, in milliseconds. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); // Set the checkpoint directory, if we're checkpointing if (params.isCheckpointed()) { jssc.checkpoint(params.getCheckpointDir()); } Set<String> topicsSet = new HashSet<String>(Arrays.asList(params .getTopic())); // Set the Kafka parameters. Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params .getBrokerList()); if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params .getAutoOffsetReset()); } // Create direct Kafka stream with the brokers and the topic. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); // See if there's an override of the default checkpoint duration. if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) { messages.checkpoint(Durations.milliseconds(params .getCheckpointMillis())); } JavaDStream<String> messageBodies = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(params); rdd.foreachPartition(func); return null; } }); return jssc; } On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com> wrote: > I'd think that we wouldn't be "accidentally recovering from checkpoint" > hours or even days after consumers have been restarted, plus the content is > the fresh content that I'm feeding, not some content that had been fed > before the last restart. > > The code is basically as follows: > > SparkConf sparkConf = createSparkConf(...); > // We'd be 'checkpointed' because we specify a checkpoint directory > which makes isCheckpointed true > JavaStreamingContext jssc = params.isCheckpointed() ? > createCheckpointedContext(sparkConf, params) : createContext(sparkConf, > params); jssc.start(); > > jssc.awaitTermination(); > > jssc.close(); > > > > On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <t...@databricks.com> wrote: > >> Are you sure you are not accidentally recovering from checkpoint? How are >> you using StreamingContext.getOrCreate() in your code? >> >> TD >> >> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> Tathagata, >>> >>> In our logs I see the batch duration millis being set first to 10 then >>> to 20 seconds. I don't see the 20 being reflected later during ingestion. >>> >>> In the Spark UI under Streaming I see the below output, notice the *10 >>> second* Batch interval. Can you think of a reason why it's stuck at >>> 10? It used to be 1 second by the way, then somehow over the course of a >>> few restarts we managed to get it to be 10 seconds. Now it won't get reset >>> to 20 seconds. Any ideas? >>> >>> Streaming >>> >>> - *Started at: *Thu Sep 03 10:59:03 EDT 2015 >>> - *Time since start: *1 day 8 hours 44 minutes >>> - *Network receivers: *0 >>> - *Batch interval: *10 seconds >>> - *Processed batches: *11790 >>> - *Waiting batches: *0 >>> - *Received records: *0 >>> - *Processed records: *0 >>> >>> >>> >>> Statistics over last 100 processed batchesReceiver Statistics >>> No receivers >>> Batch Processing Statistics >>> >>> MetricLast batchMinimum25th percentileMedian75th >>> percentileMaximumProcessing >>> Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 >>> ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms >>> >>> >>> >>> >>> >>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Could you see what the streaming tab in the Spark UI says? It should >>>> show the underlying batch duration of the StreamingContext, the details of >>>> when the batch starts, etc. >>>> >>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when >>>> data is present (that is, * Documents processed: > 0)* >>>> >>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> Tathagata, >>>>> >>>>> Checkpointing is turned on but we were not recovering. I'm looking at >>>>> the logs now, feeding fresh content hours after the restart. Here's a >>>>> snippet: >>>>> >>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0. >>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0. >>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0. >>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0. >>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0. >>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0. >>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0. >>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0. >>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0. >>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.* >>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0. >>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0. >>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0. >>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0. >>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0. >>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0. >>>>> ... >>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0. >>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.* >>>>> >>>>> Interestingly, the fresh content (4 documents) is fed about 5.6 >>>>> seconds after the previous batch, not 10 seconds. The second fresh batch >>>>> comes in 6.8 seconds after the previous empty one. >>>>> >>>>> Granted, the log message is printed after iterating over the messages >>>>> which may account for some time differences. But generally, looking at the >>>>> log messages being printed before we iterate, it's still 10 seconds each >>>>> time, not 20 which is what batchdurationmillis is currently set to. >>>>> >>>>> Code: >>>>> >>>>> JavaPairInputDStream<String, String> messages = >>>>> KafkaUtils.createDirectStream(....); >>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis)); >>>>> >>>>> >>>>> JavaDStream<String> messageBodies = messages.map(new >>>>> Function<Tuple2<String, >>>>> String>, String>() { >>>>> @Override >>>>> public String call(Tuple2<String, String> tuple2) { >>>>> return tuple2._2(); >>>>> } >>>>> }); >>>>> >>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { >>>>> @Override >>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>> >>>>> ProcessPartitionFunction func = new ProcessPartitionFunction(...); >>>>> rdd.foreachPartition(func); >>>>> return null; >>>>> } >>>>> }); >>>>> >>>>> The log message comes from ProcessPartitionFunction: >>>>> >>>>> public void call(Iterator<String> messageIterator) throws Exception { >>>>> log.info("Starting data partition processing. AppName={}, >>>>> topic={}.)...", appName, topic); >>>>> // ... iterate ... >>>>> log.info("Finished data partition processing (appName={}, >>>>> topic={}). Documents processed: {}.", appName, topic, docCount); >>>>> } >>>>> >>>>> Any ideas? Thanks. >>>>> >>>>> - Dmitry >>>>> >>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> Are you accidentally recovering from checkpoint files which has 10 >>>>>> second as the batch interval? >>>>>> >>>>>> >>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg < >>>>>> dgoldenberg...@gmail.com> wrote: >>>>>> >>>>>>> I'm seeing an oddity where I initially set the batchdurationmillis >>>>>>> to 1 second and it works fine: >>>>>>> >>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>>>> Durations.milliseconds(batchDurationMillis)); >>>>>>> >>>>>>> Then I tried changing the value to 10 seconds. The change didn't >>>>>>> seem to take. I've bounced the Spark workers and the consumers and now >>>>>>> I'm >>>>>>> seeing RDD's coming in once around 10 seconds (not always 10 seconds >>>>>>> according to the logs). >>>>>>> >>>>>>> However, now I'm trying to change the value to 20 seconds and it's >>>>>>> just not taking. I've bounced Spark master, workers, and consumers and >>>>>>> the >>>>>>> value seems "stuck" at 10 seconds. >>>>>>> >>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> - Dmitry >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >