Are there any examples of how to use StateStore with DStreams? It seems
like the idea would be to create a new version with each minibatch, but I
don't quite know how to make that happen. My lame attempt is below.
def run (ss: SparkSession): Unit = {
val c = new StreamingContext(ss.sparkContext, Seconds(2))
val stm = c.socketTextStream("localhost", 9999)
var version = 0L
stm.foreachRDD { (rdd, t) =>
val data = rdd
.map { (s) =>
val Array(k, v) = s.split(" ")
(k, v)
}
.mapPartitionsWithStateStore(ss.sqlContext, "/Users/msmith/cp", 1,
version, keySchema, valueSchema) { (store, rows) =>
val data = rows.map { case (k,v) =>
val keyRow = InternalRow(UTF8String.fromString(k))
val keyURow = UnsafeProjection.create(keySchema).apply(keyRow)
val newCount = v.toLong
val count = store.get(keyURow).map(_.getLong(0)).getOrElse(0L)
val valRow = InternalRow(count + newCount)
val valURow = UnsafeProjection.create(valueSchema).apply(valRow)
store.put(keyURow, valURow)
val ret = (k, count + newCount)
println("ret", ret)
ret
}
lazy val finish = Some(("",0)).flatMap{ case(k,v) =>
println("finish")
version = store.commit()
println("commit", version)
None
}
data ++ finish
}
println(data.collectAsMap())
}
c.start() // Start the computation
c.awaitTermination() // Wait for the computation to terminate
}