Hi What is the right way of using spark2.0 state store feature in spark streaming??I referred test cases in this(https://github.com/apache/spark/pull/11645/files) pull request and implemented word count using state store.My source is kafka(1 topic, 10 partitions). My data pump is pushing numbers into random partition.I understand that state store maintains state per partition, so I am applying partitionBy before calling mapPartitionsWithStateStore. Problem I am facing is that after some time, I start getting wrong running count.My data pump is pushing number 1 every 5 seconds, which is same as microbatch duration. First 20 micro batches ran fine but in 21st microbatch state of 1 somehow got reset and I got count=1, please see console output. Code of my streaming app val keySchema = StructType(Seq(StructField("key", StringType, true))) val valueSchema = StructType(Seq(StructField("value", IntegerType, true))) val stateStoreCheckpointPath = "/data/spark/stateStoreCheckpoints/" var stateStoreVersion:Long = 0 val stateStoreWordCount = (store: StateStore, iter: Iterator[String]) => { val out = new ListBuffer[(String, Int)] iter.foreach { s => val current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0) + 1 store.put(stringToRow(s), intToRow(current)) out.append((s,current)) } store.commit out.iterator } val opId = 100 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) .flatMap(r=>{r._2.split(" ")}) .foreachRDD((rdd, time) =>{ rdd .map(r=>(r,null)) .partitionBy(new HashPartitioner(20)) .map(r=>r._1) .mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath, opId, storeVersion = stateStoreVersion, keySchema, valueSchema)(stateStoreWordCount) .collect foreach(r=> println(time + " - " + r)) stateStoreVersion+=1 println(time + " batch finished") } ) Code of my Data pump val list = List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97) while (true) { list.foreach(r=>{ if(count%r==0){ val productRecord = new ProducerRecord(Configs.topic,new Random().nextInt(10), "" , r.toString) producer.send(productRecord) } }) count+=1 Thread.sleep(5000); }
Complete code is available here(https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test) I am using spark on yarn in client mode.spark - spark-2.0.0-snapshot (git sha - 6c5768594fe8b910125f06e1308a8154a199447e) - May 13, 2016scala - 2.10.2java - 1.8hadoop - 2.7.1kafka - 0.8.2.1 Spark config:spark.executor.cores=12spark.executor.instances=6 Console Output1463566640000 ms - (1,1)1463566640000 ms batch finished1463566645000 ms - (1,2)1463566645000 ms - (2,1)1463566645000 ms batch finished1463566650000 ms - (1,3)1463566650000 ms - (3,1)1463566650000 ms batch finished1463566655000 ms - (1,4)1463566655000 ms - (2,2)1463566655000 ms batch finished1463566660000 ms - (1,5)1463566660000 ms - (5,1)1463566660000 ms batch finished1463566665000 ms - (1,6)1463566665000 ms - (2,3)1463566665000 ms - (3,2)1463566665000 ms batch finished1463566670000 ms - (1,7)1463566670000 ms - (7,1)1463566670000 ms batch finished1463566675000 ms - (1,8)1463566675000 ms - (2,4)1463566675000 ms batch finished1463566680000 ms - (1,9)1463566680000 ms - (3,3)1463566680000 ms batch finished1463566685000 ms - (1,10)1463566685000 ms - (2,5)1463566685000 ms - (5,2)1463566685000 ms batch finished1463566690000 ms - (11,1)1463566690000 ms - (1,11)1463566690000 ms batch finished1463566695000 ms - (1,12)1463566695000 ms - (2,6)1463566695000 ms - (3,4)1463566695000 ms batch finished1463566700000 ms - (1,13)1463566700000 ms - (13,1)1463566700000 ms batch finished1463566705000 ms - (1,14)1463566705000 ms - (2,7)1463566705000 ms - (7,2)1463566705000 ms batch finished1463566710000 ms - (1,15)1463566710000 ms - (3,5)1463566710000 ms - (5,3)1463566710000 ms batch finished1463566715000 ms - (1,16)1463566715000 ms - (2,8)1463566715000 ms batch finished1463566720000 ms - (1,17)1463566720000 ms - (17,1)1463566720000 ms batch finished1463566725000 ms - (1,18)1463566725000 ms - (2,9)1463566725000 ms - (3,6)1463566725000 ms batch finished1463566730000 ms - (1,19)1463566730000 ms - (19,1)1463566730000 ms batch finished1463566735000 ms - (1,20)1463566735000 ms - (2,10)1463566735000 ms - (5,4)1463566735000 ms batch finished1463566740000 ms - (1,1) <------------------ count got reset1463566740000 ms - (3,7)1463566740000 ms - (7,1) <------------------ count got reset1463566740000 ms batch finished1463566745000 ms - (11,2)1463566745000 ms - (1,2)1463566745000 ms - (2,11)1463566745000 ms batch finished1463566750000 ms - (23,1)1463566750000 ms - (1,3)1463566750000 ms batch finished1463566755000 ms - (1,4)1463566755000 ms - (2,12)1463566755000 ms - (3,8)1463566755000 ms batch finished ThanksShekhar