Hi
What is the right way of using spark2.0 state store feature in spark
streaming??I referred test cases in
this(https://github.com/apache/spark/pull/11645/files) pull request and
implemented word count using state store.My source is kafka(1 topic, 10
partitions). My data pump is pushing numbers into random partition.I understand
that state store maintains state per partition, so I am applying partitionBy
before calling mapPartitionsWithStateStore.
Problem I am facing is that after some time, I start getting wrong running
count.My data pump is pushing number 1 every 5 seconds, which is same as
microbatch duration. First 20 micro batches ran fine but in 21st microbatch
state of 1 somehow got reset and I got count=1, please see console output.
Code of my streaming app val keySchema = StructType(Seq(StructField("key",
StringType, true))) val valueSchema = StructType(Seq(StructField("value",
IntegerType, true))) val stateStoreCheckpointPath =
"/data/spark/stateStoreCheckpoints/" var stateStoreVersion:Long = 0
val stateStoreWordCount = (store: StateStore, iter: Iterator[String]) => {
val out = new ListBuffer[(String, Int)] iter.foreach { s => val
current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0) + 1
store.put(stringToRow(s), intToRow(current)) out.append((s,current))
}
store.commit out.iterator } val opId = 100
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet) .flatMap(r=>{r._2.split(" ")})
.foreachRDD((rdd, time) =>{ rdd .map(r=>(r,null))
.partitionBy(new HashPartitioner(20)) .map(r=>r._1)
.mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath, opId,
storeVersion = stateStoreVersion, keySchema, valueSchema)(stateStoreWordCount)
.collect foreach(r=> println(time + " - " + r))
stateStoreVersion+=1 println(time + " batch finished") } )
Code of my Data pump val list =
List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
while (true) { list.foreach(r=>{ if(count%r==0){ val
productRecord = new ProducerRecord(Configs.topic,new Random().nextInt(10), "" ,
r.toString) producer.send(productRecord) } })
count+=1 Thread.sleep(5000); }
Complete code is available
here(https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test)
I am using spark on yarn in client mode.spark - spark-2.0.0-snapshot (git sha -
6c5768594fe8b910125f06e1308a8154a199447e) - May 13, 2016scala - 2.10.2java -
1.8hadoop - 2.7.1kafka - 0.8.2.1
Spark config:spark.executor.cores=12spark.executor.instances=6
Console Output146356664 ms - (1,1)146356664 ms batch
finished1463566645000 ms - (1,2)1463566645000 ms - (2,1)1463566645000 ms batch
finished146356665 ms - (1,3)146356665 ms - (3,1)146356665 ms batch
finished1463566655000 ms - (1,4)1463566655000 ms - (2,2)1463566655000 ms batch
finished14635 ms - (1,5)14635 ms - (5,1)14635 ms batch
finished146355000 ms - (1,6)146355000 ms - (2,3)146355000 ms -
(3,2)146355000 ms batch finished146356667 ms - (1,7)146356667 ms -
(7,1)146356667 ms batch finished1463566675000 ms - (1,8)1463566675000 ms -
(2,4)1463566675000 ms batch finished146356668 ms - (1,9)146356668 ms -
(3,3)146356668 ms batch finished1463566685000 ms - (1,10)1463566685000 ms -
(2,5)1463566685000 ms - (5,2)1463566685000 ms batch finished146356669 ms -
(11,1)146356669 ms - (1,11)146356669 ms batch finished1463566695000 ms
- (1,12)1463566695000 ms - (2,6)1463566695000 ms - (3,4)1463566695000 ms batch
finished146356670 ms - (1,13)146356670 ms - (13,1)146356670 ms
batch finished1463566705000 ms - (1,14)1463566705000 ms - (2,7)1463566705000 ms
- (7,2)1463566705000 ms batch finished146356671 ms - (1,15)146356671 ms
- (3,5)146356671 ms - (5,3)146356671 ms batch finished1463566715000 ms
- (1,16)1463566715000 ms - (2,8)1463566715000 ms batch finished146356672 ms
- (1,17)146356672 ms - (17,1)146356672 ms batch finished1463566725000
ms - (1,18)1463566725000 ms - (2,9)1463566725000 ms - (3,6)1463566725000 ms
batch finished146356673 ms - (1,19)146356673 ms - (19,1)146356673
ms batch finished1463566735000 ms - (1,20)1463566735000 ms -
(2,10)1463566735000 ms - (5,4)1463566735000 ms batch finished146356674 ms -
(1,1) <-- count got reset146356674 ms - (3,7)146356674
ms - (7,1) <-- count got reset146356674 ms batch
finished1463566745000 ms - (11,2)1463566745000 ms - (1,2)1463566745000 ms -
(2,11)1463566745000 ms batch finished146356675 ms - (23,1)146356675 ms
- (1,3)146356675 ms batch finished1463566755000 ms -