I'll check the log info message..

Meanwhile, the code is basically

public class KafkaSparkStreamingDriver implements Serializable {

......

    SparkConf sparkConf = createSparkConf(appName, kahunaEnv);

    JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);


    jssc.start();

    jssc.awaitTermination();

    jssc.close();

......

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

    JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

      @Override

      public JavaStreamingContext create() {

        return createContext(sparkConf, params);

      }

    };

    return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }

.......

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

    // Create context with the specified batch interval, in milliseconds.

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

    // Set the checkpoint directory, if we're checkpointing

    if (params.isCheckpointed()) {

      jssc.checkpoint(params.getCheckpointDir());

    }


    Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
.getTopic()));


    // Set the Kafka parameters.

    Map<String, String> kafkaParams = new HashMap<String, String>();

    kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());

    if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {

      kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());

    }


    // Create direct Kafka stream with the brokers and the topic.

    JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(

      jssc,

      String.class,

      String.class,

      StringDecoder.class,

      StringDecoder.class,

      kafkaParams,

      topicsSet);

    // See if there's an override of the default checkpoint duration.

    if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {

      messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));

    }

.............




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote:

> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
>   // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
>   if (mustCheckpoint && checkpointDuration == null) {
>     checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
>     logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
>   }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
> <dgoldenberg...@gmail.com> wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing.  I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval.  My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>

Reply via email to