Are there any examples of how to use StateStore with DStreams? It seems like the idea would be to create a new version with each minibatch, but I don't quite know how to make that happen. My lame attempt is below.
def run (ss: SparkSession): Unit = { val c = new StreamingContext(ss.sparkContext, Seconds(2)) val stm = c.socketTextStream("localhost", 9999) var version = 0L stm.foreachRDD { (rdd, t) => val data = rdd .map { (s) => val Array(k, v) = s.split(" ") (k, v) } .mapPartitionsWithStateStore(ss.sqlContext, "/Users/msmith/cp", 1, version, keySchema, valueSchema) { (store, rows) => val data = rows.map { case (k,v) => val keyRow = InternalRow(UTF8String.fromString(k)) val keyURow = UnsafeProjection.create(keySchema).apply(keyRow) val newCount = v.toLong val count = store.get(keyURow).map(_.getLong(0)).getOrElse(0L) val valRow = InternalRow(count + newCount) val valURow = UnsafeProjection.create(valueSchema).apply(valRow) store.put(keyURow, valURow) val ret = (k, count + newCount) println("ret", ret) ret } lazy val finish = Some(("",0)).flatMap{ case(k,v) => println("finish") version = store.commit() println("commit", version) None } data ++ finish } println(data.collectAsMap()) } c.start() // Start the computation c.awaitTermination() // Wait for the computation to terminate }