[Spark 2.0 state store] Streaming wordcount using spark state store

2016-05-18 Thread Shekhar Bansal
Hi
What is the right way of using spark2.0 state store feature in spark 
streaming??I referred test cases in 
this(https://github.com/apache/spark/pull/11645/files) pull request and 
implemented word count using state store.My source is kafka(1 topic, 10 
partitions). My data pump is pushing numbers into random partition.I understand 
that state store maintains state per partition, so I am applying partitionBy 
before calling mapPartitionsWithStateStore.
Problem I am facing is that after some time, I start getting wrong running 
count.My data pump is pushing number 1 every 5 seconds, which is same as 
microbatch duration. First 20 micro batches ran fine but in 21st microbatch 
state of 1 somehow got reset and I got count=1, please see console output.
Code of my streaming app    val keySchema = StructType(Seq(StructField("key", 
StringType, true)))    val valueSchema = StructType(Seq(StructField("value", 
IntegerType, true)))        val stateStoreCheckpointPath = 
"/data/spark/stateStoreCheckpoints/"    var stateStoreVersion:Long = 0        
val stateStoreWordCount = (store: StateStore, iter: Iterator[String]) => {      
val out = new ListBuffer[(String, Int)]      iter.foreach { s =>        val 
current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0) + 1        
store.put(stringToRow(s), intToRow(current))        out.append((s,current))     
 }
      store.commit      out.iterator    }        val opId = 100    
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)      .flatMap(r=>{r._2.split(" ")}) 
     .foreachRDD((rdd, time) =>{        rdd        .map(r=>(r,null))        
.partitionBy(new HashPartitioner(20))        .map(r=>r._1)        
.mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath, opId, 
storeVersion = stateStoreVersion, keySchema, valueSchema)(stateStoreWordCount)  
      .collect foreach(r=> println(time  + " - " + r))        
stateStoreVersion+=1        println(time + " batch finished")        }      )
Code of my Data pump    val list = 
List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
    while (true) {      list.foreach(r=>{        if(count%r==0){          val 
productRecord = new ProducerRecord(Configs.topic,new Random().nextInt(10), "" , 
r.toString)          producer.send(productRecord)        }      })      
count+=1      Thread.sleep(5000);    }

Complete code is available 
here(https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test)

I am using spark on yarn in client mode.spark - spark-2.0.0-snapshot (git sha - 
6c5768594fe8b910125f06e1308a8154a199447e)  - May 13, 2016scala - 2.10.2java - 
1.8hadoop - 2.7.1kafka - 0.8.2.1
Spark config:spark.executor.cores=12spark.executor.instances=6


Console Output146356664 ms - (1,1)146356664 ms batch 
finished1463566645000 ms - (1,2)1463566645000 ms - (2,1)1463566645000 ms batch 
finished146356665 ms - (1,3)146356665 ms - (3,1)146356665 ms batch 
finished1463566655000 ms - (1,4)1463566655000 ms - (2,2)1463566655000 ms batch 
finished14635 ms - (1,5)14635 ms - (5,1)14635 ms batch 
finished146355000 ms - (1,6)146355000 ms - (2,3)146355000 ms - 
(3,2)146355000 ms batch finished146356667 ms - (1,7)146356667 ms - 
(7,1)146356667 ms batch finished1463566675000 ms - (1,8)1463566675000 ms - 
(2,4)1463566675000 ms batch finished146356668 ms - (1,9)146356668 ms - 
(3,3)146356668 ms batch finished1463566685000 ms - (1,10)1463566685000 ms - 
(2,5)1463566685000 ms - (5,2)1463566685000 ms batch finished146356669 ms - 
(11,1)146356669 ms - (1,11)146356669 ms batch finished1463566695000 ms 
- (1,12)1463566695000 ms - (2,6)1463566695000 ms - (3,4)1463566695000 ms batch 
finished146356670 ms - (1,13)146356670 ms - (13,1)146356670 ms 
batch finished1463566705000 ms - (1,14)1463566705000 ms - (2,7)1463566705000 ms 
- (7,2)1463566705000 ms batch finished146356671 ms - (1,15)146356671 ms 
- (3,5)146356671 ms - (5,3)146356671 ms batch finished1463566715000 ms 
- (1,16)1463566715000 ms - (2,8)1463566715000 ms batch finished146356672 ms 
- (1,17)146356672 ms - (17,1)146356672 ms batch finished1463566725000 
ms - (1,18)1463566725000 ms - (2,9)1463566725000 ms - (3,6)1463566725000 ms 
batch finished146356673 ms - (1,19)146356673 ms - (19,1)146356673 
ms batch finished1463566735000 ms - (1,20)1463566735000 ms - 
(2,10)1463566735000 ms - (5,4)1463566735000 ms batch finished146356674 ms - 
(1,1) <-- count got reset146356674 ms - (3,7)146356674 
ms - (7,1) <-- count got reset146356674 ms batch 
finished1463566745000 ms - (11,2)1463566745000 ms - (1,2)1463566745000 ms - 
(2,11)1463566745000 ms batch finished146356675 ms - (23,1)146356675 ms 
- (1,3)146356675 ms batch finished1463566755000 ms - 

Spark Streaming share state between two streams

2016-04-08 Thread Shekhar Bansal
HiCan we share spark streaming state between two DStreams??Basically I want to 
create state using first stream and enrich second stream using state.Example: I 
have modified StatefulNetworkWordCount example. I am creating state using first 
stream and enriching second stream with count of first stream.val initialRDD = 
ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)

  Some(output)
}

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = state.getOption.getOrElse(0)
  val output = (word, sum)

  Some(output)
}



// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(1)



// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams2, mergeTopicSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(50)
In checkpointing directory, I can see two different state RDDs.I am using 
spark-1.6.1 and kafka-0.8.2.1
RegardsShekhar

Re: --executor-cores cannot change vcores in yarn?

2014-11-03 Thread Shekhar Bansal
If you are using capacity scheduler in yarn: By default yarn capacity
scheduler uses DefaultResourceCalculator. DefaultResourceCalculator
consider¹s only memory while allocating contains.
You can use DominantResourceCalculator, it considers memory and cpu.
In capacity-scheduler.xml set
yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res
ource.DefaultResourceCalculator


On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote:

Hi,

Well, I doesn't find original documentation, but according to
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
,
the vcores is not for physics cpu core but for virtual cores.
And I used top command to monitor the cpu utilization during the spark
task.
The spark can use all cpu even I leave --executor-cores as default(1).

Hope that it can be a help.
Cheers
Gen


Gen wrote
 Hi,
 
 Maybe it is a stupid question, but I am running spark on yarn. I request
 the resources by the following command:
 {code}
 ./spark-submit --master yarn-client --num-executors #number of worker
 --executor-cores #number of cores. ...
 {code}
 However, after launching the task, I use
/
 yarn node -status ID
/
  to monitor the situation of cluster. It shows that the number of Vcores
 used for each container is always 1 no matter what number I pass by
 --executor-cores.
 Any ideas how to solve this problem? Thanks a lot in advance for your
 help.
 
 Cheers
 Gen





--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot-
change-vcores-in-yarn-tp17883p17992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org