Hi all, I would like to ask some advice about resetting spark stateful operation. so i tried like this:
JavaStreamingContext jssc = new JavaStreamingContext(context, new Duration(5000)); jssc.remember(Duration(5*60*1000)); jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); JavaPairReceiverInputDStream<String, String> messages = (JavaPairReceiverInputDStream<String, String>) KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group", topicMap); JavaPairDStream<String,String> windowed= messages.window(WINDOW_LENGTH, SLIDE_INTERVAL); JavaDStream<LogEntry> lines = windowed.map(new Function<Tuple2<String, String>, LogEntry>() { @Override public LogEntry call(Tuple2<String, String> tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache(); JavaPairDStream<String,Long> codes=lines.mapToPair(Functions.GET_CODE). reduceByKey(Functions.SUM_REDUCER). updateStateByKey(COMPUTE_RUNNING_SUM); i thought by setting the remember to 5 minutes, the "codes" RDD that derived from messages would also be reseted in 5 minutes, but in fact no. Is there any way to reset the "codes" RDD after a period of time (5 minutes)? Thanks -- Best Regards, Eko Susilo