Hi all,

I would like to ask some advice about resetting spark stateful operation.
so i tried like this:

JavaStreamingContext jssc = new JavaStreamingContext(context, new
Duration(5000));
jssc.remember(Duration(5*60*1000));
jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
JavaPairReceiverInputDStream<String, String> messages =
            (JavaPairReceiverInputDStream<String, String>)
KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
topicMap);
JavaPairDStream<String,String> windowed= messages.window(WINDOW_LENGTH,
SLIDE_INTERVAL);
JavaDStream<LogEntry> lines = windowed.map(new Function<Tuple2<String,
String>, LogEntry>() { @Override public LogEntry call(Tuple2<String,
String> tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
_Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache();

JavaPairDStream<String,Long> codes=lines.mapToPair(Functions.GET_CODE).
reduceByKey(Functions.SUM_REDUCER).
updateStateByKey(COMPUTE_RUNNING_SUM);
i thought by setting the remember to 5 minutes, the "codes" RDD that
derived from messages would also be reseted in 5 minutes, but in fact no.

Is there any way to reset the "codes" RDD after a period of time (5
minutes)?

Thanks



-- 
Best Regards,
Eko Susilo

Reply via email to