I'll check the log info message.. Meanwhile, the code is basically
public class KafkaSparkStreamingDriver implements Serializable { ...... SparkConf sparkConf = createSparkConf(appName, kahunaEnv); JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params); jssc.start(); jssc.awaitTermination(); jssc.close(); ...... private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf, Parameters params) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(sparkConf, params); } }; return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), factory); } ....... private JavaStreamingContext createContext(SparkConf sparkConf, Parameters params) { // Create context with the specified batch interval, in milliseconds. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); // Set the checkpoint directory, if we're checkpointing if (params.isCheckpointed()) { jssc.checkpoint(params.getCheckpointDir()); } Set<String> topicsSet = new HashSet<String>(Arrays.asList(params .getTopic())); // Set the Kafka parameters. Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params .getBrokerList()); if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params .getAutoOffsetReset()); } // Create direct Kafka stream with the brokers and the topic. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); // See if there's an override of the default checkpoint duration. if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) { messages.checkpoint(Durations.milliseconds(params .getCheckpointMillis())); } ............. On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote: > If you've set the checkpoint dir, it seems like indeed the intent is > to use a default checkpoint interval in DStream: > > private[streaming] def initialize(time: Time) { > ... > // Set the checkpoint interval to be slideDuration or 10 seconds, > which ever is larger > if (mustCheckpoint && checkpointDuration == null) { > checkpointDuration = slideDuration * math.ceil(Seconds(10) / > slideDuration).toInt > logInfo("Checkpoint interval automatically set to " + > checkpointDuration) > } > > Do you see that log message? what's the interval? that could at least > explain why it's not doing anything, if it's quite long. > > It sort of seems wrong though since > https://spark.apache.org/docs/latest/streaming-programming-guide.html > suggests it was intended to be a multiple of the batch interval. The > slide duration wouldn't always be relevant anyway. > > On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg > <dgoldenberg...@gmail.com> wrote: > > I've instrumented checkpointing per the programming guide and I can tell > > that Spark Streaming is creating the checkpoint directories but I'm not > > seeing any content being created in those directories nor am I seeing the > > effects I'd expect from checkpointing. I'd expect any data that comes > into > > Kafka while the consumers are down, to get picked up when the consumers > are > > restarted; I'm not seeing that. > > > > For now my checkpoint directory is set to the local file system with the > > directory URI being in this form: file:///mnt/dir1/dir2. I see a > > subdirectory named with a UUID being created under there but no files. > > > > I'm using a custom JavaStreamingContextFactory which creates a > > JavaStreamingContext with the directory set into it via the > > checkpoint(String) method. > > > > I'm currently not invoking the checkpoint(Duration) method on the DStream > > since I want to first rely on Spark's default checkpointing interval. My > > streaming batch duration millis is set to 1 second. > > > > Anyone have any idea what might be going wrong? > > > > Also, at which point does Spark delete files from checkpointing? > > > > Thanks. >