The state store for structured streaming is an internal concept, and isn't
designed to be consumed by end users.  I'm hoping to write some
documentation on how to do aggregation, but support for reading from Kafka
and other sources will likely come in Spark 2.1+

On Wed, May 18, 2016 at 3:50 AM, Shekhar Bansal <
shekhar0...@yahoo.com.invalid> wrote:

> Hi
>
> What is the right way of using spark2.0 state store feature in spark
> streaming??
> I referred test cases in this(
> https://github.com/apache/spark/pull/11645/files) pull request and
> implemented word count using state store.
> My source is kafka(1 topic, 10 partitions). My data pump is pushing
> numbers into random partition.
> I understand that state store maintains state per partition, so I am
> applying partitionBy before calling mapPartitionsWithStateStore.
>
> Problem I am facing is that after some time, I start getting wrong running
> count.
> My data pump is pushing number 1 every 5 seconds, which is same as
> microbatch duration. First 20 micro batches ran fine but in 21st microbatch
> state of 1 somehow got reset and I got count=1, please see console output.
>
> Code of my streaming app
>     val keySchema = StructType(Seq(StructField("key", StringType, true)))
>     val valueSchema = StructType(Seq(StructField("value", IntegerType,
> true)))
>
>     val stateStoreCheckpointPath = "/data/spark/stateStoreCheckpoints/"
>     var stateStoreVersion:Long = 0
>
>     val stateStoreWordCount = (store: StateStore, iter: Iterator[String])
> => {
>       val out = new ListBuffer[(String, Int)]
>       iter.foreach { s =>
>         val current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0)
> + 1
>         store.put(stringToRow(s), intToRow(current))
>         out.append((s,current))
>       }
>
>       store.commit
>       out.iterator
>     }
>
>     val opId = 100
>     KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
>       .flatMap(r=>{r._2.split(" ")})
>       .foreachRDD((rdd, time) =>{
>         rdd
>         .map(r=>(r,null))
>         .partitionBy(new HashPartitioner(20))
>         .map(r=>r._1)
>         .mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath,
> opId, storeVersion = stateStoreVersion, keySchema,
> valueSchema)(stateStoreWordCount)
>         .collect foreach(r=> println(time  + " - " + r))
>         stateStoreVersion+=1
>         println(time + " batch finished")
>         }
>       )
>
> Code of my Data pump
>     val list =
> List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
>     while (true) {
>       list.foreach(r=>{
>         if(count%r==0){
>           val productRecord = new ProducerRecord(Configs.topic,new
> Random().nextInt(10), "" , r.toString)
>           producer.send(productRecord)
>         }
>       })
>       count+=1
>       Thread.sleep(5000);
>     }
>
>
> Complete code is available here(
> https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test
> )
>
>
> I am using spark on yarn in client mode.
> spark - spark-2.0.0-snapshot (git sha -
> 6c5768594fe8b910125f06e1308a8154a199447e)  - May 13, 2016
> scala - 2.10.2
> java - 1.8
> hadoop - 2.7.1
> kafka - 0.8.2.1
>
> Spark config:
> spark.executor.cores=12
> spark.executor.instances=6
>
>
> Console Output
> 1463566640000 ms - (1,1)
> 1463566640000 ms batch finished
> 1463566645000 ms - (1,2)
> 1463566645000 ms - (2,1)
> 1463566645000 ms batch finished
> 1463566650000 ms - (1,3)
> 1463566650000 ms - (3,1)
> 1463566650000 ms batch finished
> 1463566655000 ms - (1,4)
> 1463566655000 ms - (2,2)
> 1463566655000 ms batch finished
> 1463566660000 ms - (1,5)
> 1463566660000 ms - (5,1)
> 1463566660000 ms batch finished
> 1463566665000 ms - (1,6)
> 1463566665000 ms - (2,3)
> 1463566665000 ms - (3,2)
> 1463566665000 ms batch finished
> 1463566670000 ms - (1,7)
> 1463566670000 ms - (7,1)
> 1463566670000 ms batch finished
> 1463566675000 ms - (1,8)
> 1463566675000 ms - (2,4)
> 1463566675000 ms batch finished
> 1463566680000 ms - (1,9)
> 1463566680000 ms - (3,3)
> 1463566680000 ms batch finished
> 1463566685000 ms - (1,10)
> 1463566685000 ms - (2,5)
> 1463566685000 ms - (5,2)
> 1463566685000 ms batch finished
> 1463566690000 ms - (11,1)
> 1463566690000 ms - (1,11)
> 1463566690000 ms batch finished
> 1463566695000 ms - (1,12)
> 1463566695000 ms - (2,6)
> 1463566695000 ms - (3,4)
> 1463566695000 ms batch finished
> 1463566700000 ms - (1,13)
> 1463566700000 ms - (13,1)
> 1463566700000 ms batch finished
> 1463566705000 ms - (1,14)
> 1463566705000 ms - (2,7)
> 1463566705000 ms - (7,2)
> 1463566705000 ms batch finished
> 1463566710000 ms - (1,15)
> 1463566710000 ms - (3,5)
> 1463566710000 ms - (5,3)
> 1463566710000 ms batch finished
> 1463566715000 ms - (1,16)
> 1463566715000 ms - (2,8)
> 1463566715000 ms batch finished
> 1463566720000 ms - (1,17)
> 1463566720000 ms - (17,1)
> 1463566720000 ms batch finished
> 1463566725000 ms - (1,18)
> 1463566725000 ms - (2,9)
> 1463566725000 ms - (3,6)
> 1463566725000 ms batch finished
> 1463566730000 ms - (1,19)
> 1463566730000 ms - (19,1)
> 1463566730000 ms batch finished
> 1463566735000 ms - (1,20)
> 1463566735000 ms - (2,10)
> 1463566735000 ms - (5,4)
> 1463566735000 ms batch finished
> 1463566740000 ms - (1,1) *<------------------ count got reset*
> 1463566740000 ms - (3,7)
> 1463566740000 ms - (7,1) *<------------------ count got reset*
> 1463566740000 ms batch finished
> 1463566745000 ms - (11,2)
> 1463566745000 ms - (1,2)
> 1463566745000 ms - (2,11)
> 1463566745000 ms batch finished
> 1463566750000 ms - (23,1)
> 1463566750000 ms - (1,3)
> 1463566750000 ms batch finished
> 1463566755000 ms - (1,4)
> 1463566755000 ms - (2,12)
> 1463566755000 ms - (3,8)
> 1463566755000 ms batch finished
>
>
> Thanks
> Shekhar
>

Reply via email to