Hi,
In the following example using mapWithState, I set checkpoint interval to 1
minute. From the log, Spark stills write to the checkpoint directory every
second. Would be appreciated if someone can point out what I have done wrong.
object MapWithStateDemo {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MapWithStateDemo ")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
.setIfMissing("spark.master","local[*]")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world",
1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))
stateDstream.checkpoint(Minutes(1L))
stateDstream.print()
val targetDir = new
File(getClass.getResource("/").toURI).getParentFile.getParentFile
val checkpointDir = targetDir + "/checkpoint"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
Thanks in advance for any assistance !
Shing