Re: RDD partition after calling mapToPair

2015-11-23 Thread Thúy Hằng
Thanks Cody,

I still have concerns about this.
What's do you mean by saying Spark direct stream doesn't have a default
partitioner? Could you please help me to explain more?

When i assign 20 cores to 20 Kafka partitions, I am expecting each core
will work on a partition. Is it correct?

I'm still couldn't figure out how RDD will be partitioned after mapToPair
function. It would be great if you could brieftly explain ( or send me some
document, i couldnt find it) about how shuffle work on mapToPair function.

Thank you very much.
On Nov 23, 2015 12:26 AM, "Cody Koeninger"  wrote:

> Spark direct stream doesn't have a default partitioner.
>
> If you know that you want to do an operation on keys that are already
> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a
> shuffle.
>
> On Sat, Nov 21, 2015 at 11:46 AM, trung kien  wrote:
>
>> Hi all,
>>
>> I am having problem of understanding how RDD will be partitioned after
>> calling mapToPair function.
>> Could anyone give me more information about parititoning in this function?
>>
>> I have a simple application doing following job:
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(...)
>>
>> JavaPairDStream stats = messages.mapToPair(JSON_DECODE)
>>
>> .reduceByKey(SUM);
>>
>> saveToDB(stats)
>>
>> I setup 2 workers (each dedicate 20 cores) for this job.
>> My kafka topic has 40 partitions (I want each core handle a partition),
>> and the messages send to queue are partitioned by the same key as mapToPair
>> function.
>> I'm using default Partitioner of both Kafka and Sprark.
>>
>> Ideally, I shouldn't see the data shuffle between cores in mapToPair
>> stage, right?
>> However, in my Spark UI, I see that the "Locality Level" for this stage
>> is "ANY", which means data need to be transfered.
>> Any comments on this?
>>
>> --
>> Thanks
>> Kien
>>
>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng
Hi all,

Anyone could help me on this. It's a bit urgent for me on this.
I'm very confused and curious about Spark data checkpoint performance? Is
there any detail implementation of checkpoint I can look into?
Spark Streaming only take sub-second to process 20K messages/sec, however
it take 25 seconds for checkpoint. Now my application have average 30
seconds latency and keep increasingly.


2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:

> Thankd all, it would be great to have this feature soon.
> Do you know what's the release plan for 1.6?
>
> In addition to this, I still have checkpoint performance problem
>
> My code is just simple like this:
> JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream<String, List> stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
> stats.print()
>
>   Now I need to maintain about 800k keys, the stats here is only count
> number of occurence for key.
>   While running the cache dir is very small (about 50M), my question is:
>
>   1/ For regular micro-batch it takes about 800ms to finish, but every 10
> seconds when data checkpoint is running
>   It took me 5 seconds to finish the same size micro-batch, why it's too
> high? what's kind of job in checkpoint?
>   why it's keep increasing?
>
>   2/ When I changes the data checkpoint interval like using:
>   stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
> is 10
>
>   The checkpoint is keep increasing significantly first checkpoint is 10s,
> second is 30s, third is 70s ... and keep increasing :)
>   Why it's too high when increasing checkpoint interval?
>
> It seems that default interval works more stable.
>
> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>
>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>
>> Left some comments on the JIRA and design doc.
>>
>> -adrian
>>
>> From: Shixiong Zhu
>> Date: Tuesday, November 3, 2015 at 3:32 AM
>> To: Thúy Hằng Lê
>> Cc: Adrian Tanase, "user@spark.apache.org"
>> Subject: Re: Spark Streaming data checkpoint performance
>>
>> "trackStateByKey" is about to be added in 1.6 to resolve the performance
>> issue of "updateStateByKey". You can take a look at
>> https://issues.apache.org/jira/browse/SPARK-2629 and
>> https://github.com/apache/spark/pull/9256
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-06 Thread Thúy Hằng
Thanks Aniket,

I want to store the state to an external storage but it should be in later
step I think.
Basically, I have to use updateStateByKey function to maintain the running
state (which requires checkpoint), and my bottleneck is now in data
checkpoint.

My pseudo code is like below:

JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

   JavaPairDStream<String, List> newData = stages.filter(NEW_STATS);

   newData.foreachRDD{
 rdd.forEachPartition{
   //Store to external storage.
 }
  }

  Without using updateStageByKey, I'm only have the stats of the last
micro-batch.

Any advise on this?


2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar <aniket.bhatna...@gmail.com>:

> Can you try storing the state (word count) in an external key value store?
>
> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê <thuyhang...@gmail.com> wrote:
>
>> Hi all,
>>
>> Anyone could help me on this. It's a bit urgent for me on this.
>> I'm very confused and curious about Spark data checkpoint performance? Is
>> there any detail implementation of checkpoint I can look into?
>> Spark Streaming only take sub-second to process 20K messages/sec, however
>> it take 25 seconds for checkpoint. Now my application have average 30
>> seconds latency and keep increasingly.
>>
>>
>> 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê <thuyhang...@gmail.com>:
>>
>>> Thankd all, it would be great to have this feature soon.
>>> Do you know what's the release plan for 1.6?
>>>
>>> In addition to this, I still have checkpoint performance problem
>>>
>>> My code is just simple like this:
>>> JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf,Durations.seconds(2));
>>> jssc.checkpoint("spark-data/checkpoint");
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(...);
>>> JavaPairDStream<String, List> stats =
>>> messages.mapToPair(parseJson)
>>> .reduceByKey(REDUCE_STATS)
>>> .updateStateByKey(RUNNING_STATS);
>>>
>>> stats.print()
>>>
>>>   Now I need to maintain about 800k keys, the stats here is only count
>>> number of occurence for key.
>>>   While running the cache dir is very small (about 50M), my question is:
>>>
>>>   1/ For regular micro-batch it takes about 800ms to finish, but every
>>> 10 seconds when data checkpoint is running
>>>   It took me 5 seconds to finish the same size micro-batch, why it's too
>>> high? what's kind of job in checkpoint?
>>>   why it's keep increasing?
>>>
>>>   2/ When I changes the data checkpoint interval like using:
>>>   stats.checkpoint(Durations.seconds(100)); //change to 100,
>>> defaults is 10
>>>
>>>   The checkpoint is keep increasing significantly first checkpoint is
>>> 10s, second is 30s, third is 70s ... and keep increasing :)
>>>   Why it's too high when increasing checkpoint interval?
>>>
>>> It seems that default interval works more stable.
>>>
>>> On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:
>>>
>>>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>>>
>>>> Left some comments on the JIRA and design doc.
>>>>
>>>> -adrian
>>>>
>>>> From: Shixiong Zhu
>>>> Date: Tuesday, November 3, 2015 at 3:32 AM
>>>> To: Thúy Hằng Lê
>>>> Cc: Adrian Tanase, "user@spark.apache.org"
>>>> Subject: Re: Spark Streaming data checkpoint performance
>>>>
>>>> "trackStateByKey" is about to be added in 1.6 to resolve the
>>>> performance issue of "updateStateByKey". You can take a look at
>>>> https://issues.apache.org/jira/browse/SPARK-2629 and
>>>> https://github.com/apache/spark/pull/9256
>>>>
>>>
>>


Re: Spark Streaming data checkpoint performance

2015-11-05 Thread Thúy Hằng
Thankd all, it would be great to have this feature soon.
Do you know what's the release plan for 1.6?

In addition to this, I still have checkpoint performance problem

My code is just simple like this:
JavaStreamingContext jssc = new
JavaStreamingContext(sparkConf,Durations.seconds(2));
jssc.checkpoint("spark-data/checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(...);
JavaPairDStream<String, List> stats =
messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS)
.updateStateByKey(RUNNING_STATS);

stats.print()

  Now I need to maintain about 800k keys, the stats here is only count
number of occurence for key.
  While running the cache dir is very small (about 50M), my question is:

  1/ For regular micro-batch it takes about 800ms to finish, but every 10
seconds when data checkpoint is running
  It took me 5 seconds to finish the same size micro-batch, why it's too
high? what's kind of job in checkpoint?
  why it's keep increasing?

  2/ When I changes the data checkpoint interval like using:
  stats.checkpoint(Durations.seconds(100)); //change to 100, defaults
is 10

  The checkpoint is keep increasing significantly first checkpoint is 10s,
second is 30s, third is 70s ... and keep increasing :)
  Why it's too high when increasing checkpoint interval?

It seems that default interval works more stable.

On Nov 4, 2015 9:08 PM, "Adrian Tanase" <atan...@adobe.com> wrote:

> Nice! Thanks for sharing, I wasn’t aware of the new API.
>
> Left some comments on the JIRA and design doc.
>
> -adrian
>
> From: Shixiong Zhu
> Date: Tuesday, November 3, 2015 at 3:32 AM
> To: Thúy Hằng Lê
> Cc: Adrian Tanase, "user@spark.apache.org"
> Subject: Re: Spark Streaming data checkpoint performance
>
> "trackStateByKey" is about to be added in 1.6 to resolve the performance
> issue of "updateStateByKey". You can take a look at
> https://issues.apache.org/jira/browse/SPARK-2629 and
> https://github.com/apache/spark/pull/9256
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Thúy Hằng
Hi Andrian,

Thanks for the information.

However your 2 suggestions couldn't really work for me.

Accuracy is the most important aspect in my application. So keeping only 15
minutes window stats or prune out some of keys is impossible for my
application.

I can change the checking point interval as your suggestion,
however is there any other Spark configuration to turning the data
checkpoint performance?

And just curious, technically why updateStateByKey need to be called for
very key (regardless the new occurrences or not)? Does Spark has any plan
to fix it?
I have 4M keys need to maintain the statistics however only few of them are
changed in each batch interval.

2015-11-02 22:37 GMT+07:00 Adrian Tanase <atan...@adobe.com>:

> You are correct, the default checkpointing interval is 10 seconds or your
> batch size, whichever is bigger. You can change it by calling
> .checkpoint(x) on your resulting Dstream.
>
> For the rest, you are probably keeping an “all time” word count that grows
> unbounded if you never remove words from the map. Keep in mind that
> updateStateByKey is called for every key in the state RDD, regardless if
> you have new occurrences or not.
>
> You should consider at least one of these strategies:
>
>- run your word count on a windowed Dstream (e.g. Unique counts over
>the last 15 minutes)
>   - Your best bet here is reduceByKeyAndWindow with an inverse
>   function
>- Make your state object more complicated and try to prune out words
>with very few occurrences or that haven’t been updated for a long time
>   - You can do this by emitting None from updateStateByKey
>
> Hope this helps,
> -adrian
>
> From: Thúy Hằng Lê
> Date: Monday, November 2, 2015 at 7:20 AM
> To: "user@spark.apache.org"
> Subject: Spark Streaming data checkpoint performance
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>


Spark Streaming data checkpoint performance

2015-11-01 Thread Thúy Hằng
Hi Spark guru

I am evaluating Spark Streaming,

In my application I need to maintain cumulative statistics (e.g the total
running word count), so I need to call the updateStateByKey function on
very micro-batch.

After setting those things, I got following behaviors:
* The Processing Time is very high every 10 seconds - usually 5x
higher (which I guess it's data checking point job)
* The Processing Time becomes higher and higher over time, after 10
minutes it's much higher than the batch interval and lead to huge
Scheduling Delay and a lots Active Batches in queue.

My questions is:

 * Is this expected behavior? Is there any way to improve the
performance of data checking point?
 * How data checking point in Spark Streaming works? Does it need
to load all previous checking point data in order to build new one?

My job is very simple:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...);

JavaPairDStream stats = messages.mapToPair(parseJson)
.reduceByKey(REDUCE_STATS) .updateStateByKey(RUNNING_STATS);

stats.print()


Re: Using Spark for portfolio manager app

2015-09-25 Thread Thúy Hằng
Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers
for the whole things ( i am thinking of a relational database for this, can
be MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data
to external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that
havent changed) i believe Spark streamming must has a way to do it, but i
still couldn't find an example doing similar job.


Re: Using Spark for portfolio manager app

2015-09-22 Thread Thúy Hằng
That's great answer Andrian.
I find a lots of information here. I have direction for application now, i
will try your suggestion :)

Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase <atan...@adobe.com> đã
viết:

>
>1. reading from kafka has exactly once guarantees - we are using it in
>production today (with the direct receiver)
>1. ​you will probably have 2 topics, loading both into spark and
>   joining / unioning as needed is not an issue
>   2. tons of optimizations you can do there, assuming everything else
>   works
>2. ​for ad-hoc query I would say you absolutely need to look at
>external storage
>1. ​querying the Dstream or spark's RDD's directly should be done
>   mostly for aggregates/metrics, not by users
>   2. if you look at HBase or Cassandra for storage then 50k
>   writes /sec are not a problem at all, especially combined with a smart
>   client that does batch puts (like async hbase
>   <https://github.com/OpenTSDB/asynchbase>)
>   3. you could also consider writing the updates to another kafka
>   topic and have  a different component that updates the DB, if you think 
> of
>   other optimisations there
>3. ​by stats I assume you mean metrics (operational or business)
>1. ​there are multiple ways to do this, however I would not encourage
>   you to query spark directly, especially if you need an archive/history 
> of
>   your datapoints
>   2. we are using OpenTSDB (we already have a HBase cluster) +
>   Grafana for dashboarding
>   3. collecting the metrics is a bit hairy in a streaming app - we
>   have experimented with both accumulators and RDDs specific for metrics -
>   chose the RDDs that write to OpenTSDB using foreachRdd
>
> ​-adrian
>
> --
> *From:* Thúy Hằng Lê <thuyhang...@gmail.com
> <javascript:_e(%7B%7D,'cvml','thuyhang...@gmail.com');>>
> *Sent:* Sunday, September 20, 2015 7:26 AM
> *To:* Jörn Franke
> *Cc:* user@spark.apache.org
> <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');>
> *Subject:* Re: Using Spark for portfolio manager app
>
> Thanks Adrian and Jorn for the answers.
>
> Yes, you're right there are lot of things I need to consider if I want to
> use Spark for my app.
>
> I still have few concerns/questions from your information:
>
> 1/ I need to combine trading stream with tick stream, I am planning to use
> Kafka for that
> If I am using approach #2 (Direct Approach) in this tutorial
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> <https://spark.apache.org/docs/latest/streaming-kafka-integration.html>
> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Read more...
> <https://spark.apache.org/docs/latest/streaming-kafka-integration.html>
>
> Will I receive exactly one semantics? Or I have to add some logic in my
> code to archive that.
> As your suggestion of using delta update, exactly one semantic is required
> for this application.
>
> 2/ For ad-hoc query, I must output of Spark to external storage and query
> on that right?
> Is there any way to do ah-hoc query on Spark? my application could have
> 50k updates per second at pick time.
> Persistent to external storage lead to high latency in my app.
>
> 3/ How to get real-time statistics from Spark,
> In  most of the Spark streaming examples, the statistics are echo to the
> stdout.
> However, I want to display those statics on GUI, is there any way to
> retrieve data from Spark directly without using external Storage?
>
>
> 2015-09-19 16:23 GMT+07:00 Jörn Franke <jornfra...@gmail.com
> <javascript:_e(%7B%7D,'cvml','jornfra...@gmail.com');>>:
>
>> If you want to be able to let your users query their portfolio then you
>> may want to think about storing the current state of the portfolios in
>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>> sense. For the rest you may use Spark.
>>
>> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê <thuyhang...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','thuyhang...@gmail.com');>> a écrit :
>>
>>> Hi all,
>>>
>>> I am going to build a financial application for Portfolio Manager, where
>>> each portfolio contains a list of stocks, the number of shares purchased,
>>> and the purchase price.
>>> Another source of information is stocks price from market data. The
>>> application need to calculate real-time 

Using Spark for portfolio manager app

2015-09-18 Thread Thúy Hằng
Hi all,

I am going to build a financial application for Portfolio Manager, where
each portfolio contains a list of stocks, the number of shares purchased,
and the purchase price.
Another source of information is stocks price from market data. The
application need to calculate real-time gain or lost of each stock in each
portfolio ( compared to the purchase price).

I am new with Spark, i know using Spark Streaming I can aggregate portfolio
possitions in real-time, for example:
user A contains:
  - 100 IBM stock with transactionValue=$15000
  - 500 AAPL stock with transactionValue=$11400

Now given the stock prices change in real-time too, e.g if IBM price at
151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
15000 = $100

My questions are:

 * What is the best method to combine 2 real-time streams(
transaction made by user and market pricing data) in Spark.
 * How can I use real-time Adhoc SQL again portfolio's positions,
is there any way i can do SQL on the output of Spark Streamming.
 For example,
  select sum(gainOrLost) from portfolio where user='A';
 * What are prefered external storages for Spark in this use case.
 * Is spark is right choice for my use case?