Hi
What is the right way of using spark2.0 state store feature in spark 
streaming??I referred test cases in 
this(https://github.com/apache/spark/pull/11645/files) pull request and 
implemented word count using state store.My source is kafka(1 topic, 10 
partitions). My data pump is pushing numbers into random partition.I understand 
that state store maintains state per partition, so I am applying partitionBy 
before calling mapPartitionsWithStateStore.
Problem I am facing is that after some time, I start getting wrong running 
count.My data pump is pushing number 1 every 5 seconds, which is same as 
microbatch duration. First 20 micro batches ran fine but in 21st microbatch 
state of 1 somehow got reset and I got count=1, please see console output.
Code of my streaming app    val keySchema = StructType(Seq(StructField("key", 
StringType, true)))    val valueSchema = StructType(Seq(StructField("value", 
IntegerType, true)))        val stateStoreCheckpointPath = 
"/data/spark/stateStoreCheckpoints/"    var stateStoreVersion:Long = 0        
val stateStoreWordCount = (store: StateStore, iter: Iterator[String]) => {      
val out = new ListBuffer[(String, Int)]      iter.foreach { s =>        val 
current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0) + 1        
store.put(stringToRow(s), intToRow(current))        out.append((s,current))     
 }
      store.commit      out.iterator    }        val opId = 100    
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)      .flatMap(r=>{r._2.split(" ")}) 
     .foreachRDD((rdd, time) =>{        rdd        .map(r=>(r,null))        
.partitionBy(new HashPartitioner(20))        .map(r=>r._1)        
.mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath, opId, 
storeVersion = stateStoreVersion, keySchema, valueSchema)(stateStoreWordCount)  
      .collect foreach(r=> println(time  + " - " + r))        
stateStoreVersion+=1        println(time + " batch finished")        }      )
Code of my Data pump    val list = 
List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
    while (true) {      list.foreach(r=>{        if(count%r==0){          val 
productRecord = new ProducerRecord(Configs.topic,new Random().nextInt(10), "" , 
r.toString)          producer.send(productRecord)        }      })      
count+=1      Thread.sleep(5000);    }

Complete code is available 
here(https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test)

I am using spark on yarn in client mode.spark - spark-2.0.0-snapshot (git sha - 
6c5768594fe8b910125f06e1308a8154a199447e)  - May 13, 2016scala - 2.10.2java - 
1.8hadoop - 2.7.1kafka - 0.8.2.1
Spark config:spark.executor.cores=12spark.executor.instances=6


Console Output1463566640000 ms - (1,1)1463566640000 ms batch 
finished1463566645000 ms - (1,2)1463566645000 ms - (2,1)1463566645000 ms batch 
finished1463566650000 ms - (1,3)1463566650000 ms - (3,1)1463566650000 ms batch 
finished1463566655000 ms - (1,4)1463566655000 ms - (2,2)1463566655000 ms batch 
finished1463566660000 ms - (1,5)1463566660000 ms - (5,1)1463566660000 ms batch 
finished1463566665000 ms - (1,6)1463566665000 ms - (2,3)1463566665000 ms - 
(3,2)1463566665000 ms batch finished1463566670000 ms - (1,7)1463566670000 ms - 
(7,1)1463566670000 ms batch finished1463566675000 ms - (1,8)1463566675000 ms - 
(2,4)1463566675000 ms batch finished1463566680000 ms - (1,9)1463566680000 ms - 
(3,3)1463566680000 ms batch finished1463566685000 ms - (1,10)1463566685000 ms - 
(2,5)1463566685000 ms - (5,2)1463566685000 ms batch finished1463566690000 ms - 
(11,1)1463566690000 ms - (1,11)1463566690000 ms batch finished1463566695000 ms 
- (1,12)1463566695000 ms - (2,6)1463566695000 ms - (3,4)1463566695000 ms batch 
finished1463566700000 ms - (1,13)1463566700000 ms - (13,1)1463566700000 ms 
batch finished1463566705000 ms - (1,14)1463566705000 ms - (2,7)1463566705000 ms 
- (7,2)1463566705000 ms batch finished1463566710000 ms - (1,15)1463566710000 ms 
- (3,5)1463566710000 ms - (5,3)1463566710000 ms batch finished1463566715000 ms 
- (1,16)1463566715000 ms - (2,8)1463566715000 ms batch finished1463566720000 ms 
- (1,17)1463566720000 ms - (17,1)1463566720000 ms batch finished1463566725000 
ms - (1,18)1463566725000 ms - (2,9)1463566725000 ms - (3,6)1463566725000 ms 
batch finished1463566730000 ms - (1,19)1463566730000 ms - (19,1)1463566730000 
ms batch finished1463566735000 ms - (1,20)1463566735000 ms - 
(2,10)1463566735000 ms - (5,4)1463566735000 ms batch finished1463566740000 ms - 
(1,1) <------------------ count got reset1463566740000 ms - (3,7)1463566740000 
ms - (7,1) <------------------ count got reset1463566740000 ms batch 
finished1463566745000 ms - (11,2)1463566745000 ms - (1,2)1463566745000 ms - 
(2,11)1463566745000 ms batch finished1463566750000 ms - (23,1)1463566750000 ms 
- (1,3)1463566750000 ms batch finished1463566755000 ms - (1,4)1463566755000 ms 
- (2,12)1463566755000 ms - (3,8)1463566755000 ms batch finished

ThanksShekhar

Reply via email to