updateStateByKey performance / API

2015-03-18 Thread Nikos Viorres
Hi all,

We are having a few issues with the performance of updateStateByKey
operation in Spark Streaming (1.2.1 at the moment) and any advice would be
greatly appreciated. Specifically, on each tick of the system (which is set
at 10 secs) we need to update a state tuple where the key is the user_id
and value an object with some state about the user. The problem is that
using Kryo serialization for 5M users, this gets really slow to the point
that we have to increase the period to more than 10 seconds so as not to
fall behind.
The input for the streaming job is a Kafka stream which is consists of key
value pairs of user_ids with some sort of action codes, we join this to our
checkpointed state key and update the state.
I understand that the reason for iterating over the whole state set is for
evicting items or updating state for everyone for time-depended
computations but this does not apply on our situation and it hurts
performance really bad.
Is there a possibility of implementing in the future and extra call in the
API for updating only a specific subset of keys?

p.s. i will try asap to setting the dstream as non-serialized but then i am
worried about GC and checkpointing performance


Re: Idempotent count

2015-03-18 Thread Arush Kharbanda
Hi

Yes spark streaming is capable of stateful stream processing. With or
without state is a way of classifying state.
Checkpoints hold metadata and Data.

Thanks


On Wed, Mar 18, 2015 at 4:00 AM, Binh Nguyen Van binhn...@gmail.com wrote:

 Hi all,

 I am new to Spark so please forgive me if my questions is stupid.
 I am trying to use Spark-Streaming in an application that read data
 from a queue (Kafka) and do some aggregation (sum, count..) and
 then persist result to an external storage system (MySQL, VoltDB...)

 From my understanding of Spark-Streaming, I can have two ways
 of doing aggregation:

- Stateless: I don't have to keep state and just apply new delta
values to the external system. From my understanding, doing in this way I
may end up with over counting when there is failure and replay.
- Statefull: Use checkpoint to keep state and blindly save new state
to external system. Doing in this way I have correct aggregation result but
I have to keep data in two places (state and external system)

 My questions are:

- Is my understanding of Stateless and Statefull aggregation correct?
If not please correct me!
- For the Statefull aggregation, What does Spark-Streaming keep when
it saves checkpoint?

 Please kindly help!

 Thanks
 -Binh




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


updateStateByKey performance API

2015-03-18 Thread nvrs
Hi all,

We are having a few issues with the performance of updateStateByKey
operation in Spark Streaming (1.2.1 at the moment) and any advice would be
greatly appreciated. Specifically, on each tick of the system (which is set
at 10 secs) we need to update a state tuple where the key is the user_id and
value an object with some state about the user. The problem is that using
Kryo serialization for 5M users, this gets really slow to the point that we
have to increase the period to more than 10 seconds so as not to fall
behind. 
The input for the streaming job is a Kafka stream which is consists of key
value pairs of user_ids with some sort of action codes, we join this to our
checkpointed state key and update the state.
I understand that the reason for iterating over the whole state set is for
evicting items or updating state for everyone for time-depended computations
but this does not apply on our situation and it hurts performance really
bad. 
Is there a possibility of implementing in the future and extra call in the
API for updating only a specific subset of keys?

p.s. i will try asap to setting the dstream as non-serialized but then i am
worried about GC and checkpointing performance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Would you please check your driver log or streaming web UI to see each job's 
latency, including processing latency and total latency.

Seems from your code, sliding window is just 3 seconds, so you will process 
each 60 second's data in 3 seconds, if processing latency is larger than the 
sliding window, so maybe you computation power cannot reach to the qps you 
wanted.

I think you need to identify the bottleneck at first, and then trying to tune 
your code, balance the data, add more computation resources.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 1:39 PM
To: user@spark.apache.org
Subject: [spark-streaming] can shuffle write to disk be disabled?

I use spark-streaming reading  messages from a Kafka,  the producer creates 
messages about 1500 per second


 def hash(x: String): Int = {

MurmurHash3.stringHash(x)

 }



 val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
StorageLevel.MEMORY_ONLY_SER).map(_._2)



 val clickstream = stream.map(log = {

   //parse log

   ...

  (hash(log.url), HashSet(hash(log.userid)))

}).window(Seconds(60), Seconds(3))



val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ 
case(url, visits) = {

 val uv = visits.size

 (uv, url)

}})



upv.foreach(rdd = println(new Date() + 
\n---\n + rdd.top(20).mkString(\n) + 
\n))



it is quite quick upon startup, but after running for a few minutes, it goes 
slower and slower and the latency can be minutes.



I found a lot of shuffle writes at /tmp/spark- in several gigabytes.



with 1500 qps of message and window size of 60 seconds, I think it should be 
done within memory without writing to disk at all



I've set executor-memory to 8G, So there is plenty of memory.



$SPARK_HOME/bin/spark-submit \

  --class SimpleApp \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar



I also tries these settings, but it still spill to disk.



spark.master spark://localhost:7077

#spark.driver.memory  4g

#spark.shuffle.file.buffer.kb 4096

#spark.shuffle.memoryFraction 0.8

#spark.storage.unrollFraction 0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec   lz4

spark.shuffle.spill  false

spark.serializer org.apache.spark.serializer.KryoSerializer



where am I wrong?


Re: HIVE SparkSQL

2015-03-18 Thread Jörn Franke
Hallo,

Depending non your needs, search technology, such as SolrCloud or
ElasticSearch makes more sense. If you go for the Cassandra solution you
can use the lucene text indexer...
I am not sure if hive or sparksql are very suitable for text. However, if
you do not need text search then feel free to go for them.
What kind of statistics / aggregates do.you want to get out of of your logs?

Best regards
Le 18 mars 2015 04:29, 宫勐 shadowinl...@gmail.com a écrit :

 Hi:

I need to migrate a Log Analysis System from mysql + some C++ real time
 computer framwork to Hadoop ecosystem.

When I want to build a data warehouse. don't know which one is the
 right choice. Cassandra? HIVE? Or just SparkSQL ?

 There is few benchmark for these systems.

 My scenario as below:

 Every 5 seconds, flume will translate a log file from IDC.   The log
 file is pre-format to adapt Mysql Load event。 There is many IDCs,and will
 close down OR reconnect to the flume random.

 Every online IDC must receive analyse of their LOG every 5mins

 Any Suggestion?

 Thanks
 Yours
 Meng



<    1   2