Re: Using Spark for portfolio manager app
Thanks all for the feedback so far. I havn't decided which external storage will be used yet. HBase is cool but it requires Hadoop in production. I only have 3-4 servers for the whole things ( i am thinking of a relational database for this, can be MariaDB, Memsql or mysql) but they are hard to scale. I will try various appoaches before making any decision. In addition, using Spark Streaming is there any way to update only new data to external storage after using updateStateByKey? The foreachRDD function seems to loop over all RDDs( includes one that havent changed) i believe Spark streamming must has a way to do it, but i still couldn't find an example doing similar job.
Re: Using Spark for portfolio manager app
Just use the official connector from DataStax https://github.com/datastax/spark-cassandra-connector Your solution is very similar. Let’s assume the state is case class UserState(amount: Int, updates: Seq[Int]) And your user has 100 - If your user does not see an update, you can emit Some(UserState(100, Seq.empty)) Otherwise maybe you can emit Some(UserState(130, List(50, -20))) You can then process the updates like this usersState.filter(_.updates.length > 0).foreachRdd { ... } Regarding optimizations, I would not worry too much about it. Going through users with no updates is most likely a no-op. Spark HAS to iterate through all the state objects since it does not operate with deltas from one batch to the next – the StateDStream is really the whole app state packed as a RDD. You could look at one of the other updateStateByKey methods – maybe you can write more efficient code there: def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = … What you can do though (and here you’ll be glad that spark also executes the code for state objects w/o updates) is cleanup users if they haven’t received updates for a long time, then load the state from DB the next time you see them. I would consider this a must-have optimization to keep some bounds on the memory needs. -adrian From: Thúy Hằng Lê Date: Friday, September 25, 2015 at 2:05 PM To: Adrian Tanase Subject: Re: Using Spark for portfolio manager app Hi Adrian, Thanks Cassandra seems to be good candidate too. I will give it a try. Do you know any stable connector that help Spark work with Cassandra? Or I should write it myself. Regards my second question, i think i figuring the another solution, i will append another flag ( like isNew) to the tupe in updateStateByKey function. Then using filter to know which record i should update to database. But it would be great if you could share your solution too( i don't quite get the idea of emitting new tupe). In addition to this, for Spark design, seems it have to iterate to all key( includes one that not change) to do aggregation for each batch. For my use cases i have 3M keys, but only 2-3K change for each batch ( every 1 second) is there any way to optimize this process? On Sep 25, 2015 4:12 PM, "Adrian Tanase" <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful as Hbase, a lot easier to setup and manage. Well suited for this type of usecase, with a combination of K/V store and time series data. For the second question, I’ve used this pattern all the time for “flash messages” - passing info as a 1 time message downstream: * In your updateStateByKey function, emit a tuple of (actualNewState, changedData) * Then filter this on !changedData.isEmpty or something * And only do foreachRdd on the filtered stream. Makes sense? -adrian From: Thúy Hằng Lê Date: Friday, September 25, 2015 at 10:31 AM To: ALEX K Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re: Using Spark for portfolio manager app Thanks all for the feedback so far. I havn't decided which external storage will be used yet. HBase is cool but it requires Hadoop in production. I only have 3-4 servers for the whole things ( i am thinking of a relational database for this, can be MariaDB, Memsql or mysql) but they are hard to scale. I will try various appoaches before making any decision. In addition, using Spark Streaming is there any way to update only new data to external storage after using updateStateByKey? The foreachRDD function seems to loop over all RDDs( includes one that havent changed) i believe Spark streamming must has a way to do it, but i still couldn't find an example doing similar job.
Re: Using Spark for portfolio manager app
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful as Hbase, a lot easier to setup and manage. Well suited for this type of usecase, with a combination of K/V store and time series data. For the second question, I’ve used this pattern all the time for “flash messages” - passing info as a 1 time message downstream: * In your updateStateByKey function, emit a tuple of (actualNewState, changedData) * Then filter this on !changedData.isEmpty or something * And only do foreachRdd on the filtered stream. Makes sense? -adrian From: Thúy Hằng Lê Date: Friday, September 25, 2015 at 10:31 AM To: ALEX K Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re: Using Spark for portfolio manager app Thanks all for the feedback so far. I havn't decided which external storage will be used yet. HBase is cool but it requires Hadoop in production. I only have 3-4 servers for the whole things ( i am thinking of a relational database for this, can be MariaDB, Memsql or mysql) but they are hard to scale. I will try various appoaches before making any decision. In addition, using Spark Streaming is there any way to update only new data to external storage after using updateStateByKey? The foreachRDD function seems to loop over all RDDs( includes one that havent changed) i believe Spark streamming must has a way to do it, but i still couldn't find an example doing similar job.
Re: Using Spark for portfolio manager app
Thuy, if you decide to go with Hbase for external storage consider using a light-weight SQL layer such as Apache Phoenix, it has a spark plugin <https://phoenix.apache.org/phoenix_spark.html> & JDBC driver, and throughput is pretty good even for heavy market data feed (make sure to use batched commits). In our case we send Kafka streams directly into Hbase via Phoenix JDBC upserts <https://phoenix.apache.org/language/index.html#upsert_values>, and Spark dataframes are mapped to Phoenix tables for downstream analytics. Alternatively you can use Cassandra <https://databricks.com/blog/2015/06/16/zen-and-the-art-of-spark-maintenance-with-cassandra.html> for the backend, but phoenix saves you a lot of coding, and a lot of optimizations for joins & aggregations are already done for you (it plugs into Hbase coprocessors). Alex On Tue, Sep 22, 2015 at 12:12 PM, Thúy Hằng Lê <thuyhang...@gmail.com> wrote: > That's great answer Andrian. > I find a lots of information here. I have direction for application now, i > will try your suggestion :) > > Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase <atan...@adobe.com> > đã viết: > > >>1. reading from kafka has exactly once guarantees - we are using it >>in production today (with the direct receiver) >>1. you will probably have 2 topics, loading both into spark and >> joining / unioning as needed is not an issue >> 2. tons of optimizations you can do there, assuming everything >> else works >>2. for ad-hoc query I would say you absolutely need to look at >>external storage >>1. querying the Dstream or spark's RDD's directly should be done >> mostly for aggregates/metrics, not by users >> 2. if you look at HBase or Cassandra for storage then 50k >> writes /sec are not a problem at all, especially combined with a smart >> client that does batch puts (like async hbase >> <https://github.com/OpenTSDB/asynchbase>) >> 3. you could also consider writing the updates to another kafka >> topic and have a different component that updates the DB, if you >> think of >> other optimisations there >>3. by stats I assume you mean metrics (operational or business) >>1. there are multiple ways to do this, however I would not encourage >> you to query spark directly, especially if you need an archive/history >> of >> your datapoints >> 2. we are using OpenTSDB (we already have a HBase cluster) + >> Grafana for dashboarding >> 3. collecting the metrics is a bit hairy in a streaming app - we >> have experimented with both accumulators and RDDs specific for metrics >> - >> chose the RDDs that write to OpenTSDB using foreachRdd >> >> -adrian >> >> -- >> *From:* Thúy Hằng Lê <thuyhang...@gmail.com> >> *Sent:* Sunday, September 20, 2015 7:26 AM >> *To:* Jörn Franke >> *Cc:* user@spark.apache.org >> *Subject:* Re: Using Spark for portfolio manager app >> >> Thanks Adrian and Jorn for the answers. >> >> Yes, you're right there are lot of things I need to consider if I want to >> use Spark for my app. >> >> I still have few concerns/questions from your information: >> >> 1/ I need to combine trading stream with tick stream, I am planning to >> use Kafka for that >> If I am using approach #2 (Direct Approach) in this tutorial >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html >> <https://spark.apache.org/docs/latest/streaming-kafka-integration.html> >> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ... >> Spark Streaming + Kafka Integration Guide. Apache Kafka is >> publish-subscribe messaging rethought as a distributed, partitioned, >> replicated commit log service. >> Read more... >> <https://spark.apache.org/docs/latest/streaming-kafka-integration.html> >> >> Will I receive exactly one semantics? Or I have to add some logic in my >> code to archive that. >> As your suggestion of using delta update, exactly one semantic is >> required for this application. >> >> 2/ For ad-hoc query, I must output of Spark to external storage and query >> on that right? >> Is there any way to do ah-hoc query on Spark? my application could have >> 50k updates per second at pick time. >> Persistent to external storage lead to high latency in my app. >> >> 3/ How to get real-time statistics from Spark, >> In most of the Spark streaming examples, the statistics are echo to the >> stdout. >&
Re: Using Spark for portfolio manager app
That's great answer Andrian. I find a lots of information here. I have direction for application now, i will try your suggestion :) Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase <atan...@adobe.com> đã viết: > >1. reading from kafka has exactly once guarantees - we are using it in >production today (with the direct receiver) >1. you will probably have 2 topics, loading both into spark and > joining / unioning as needed is not an issue > 2. tons of optimizations you can do there, assuming everything else > works >2. for ad-hoc query I would say you absolutely need to look at >external storage >1. querying the Dstream or spark's RDD's directly should be done > mostly for aggregates/metrics, not by users > 2. if you look at HBase or Cassandra for storage then 50k > writes /sec are not a problem at all, especially combined with a smart > client that does batch puts (like async hbase > <https://github.com/OpenTSDB/asynchbase>) > 3. you could also consider writing the updates to another kafka > topic and have a different component that updates the DB, if you think > of > other optimisations there >3. by stats I assume you mean metrics (operational or business) >1. there are multiple ways to do this, however I would not encourage > you to query spark directly, especially if you need an archive/history > of > your datapoints > 2. we are using OpenTSDB (we already have a HBase cluster) + > Grafana for dashboarding > 3. collecting the metrics is a bit hairy in a streaming app - we > have experimented with both accumulators and RDDs specific for metrics - > chose the RDDs that write to OpenTSDB using foreachRdd > > -adrian > > -- > *From:* Thúy Hằng Lê <thuyhang...@gmail.com > <javascript:_e(%7B%7D,'cvml','thuyhang...@gmail.com');>> > *Sent:* Sunday, September 20, 2015 7:26 AM > *To:* Jörn Franke > *Cc:* user@spark.apache.org > <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');> > *Subject:* Re: Using Spark for portfolio manager app > > Thanks Adrian and Jorn for the answers. > > Yes, you're right there are lot of things I need to consider if I want to > use Spark for my app. > > I still have few concerns/questions from your information: > > 1/ I need to combine trading stream with tick stream, I am planning to use > Kafka for that > If I am using approach #2 (Direct Approach) in this tutorial > https://spark.apache.org/docs/latest/streaming-kafka-integration.html > <https://spark.apache.org/docs/latest/streaming-kafka-integration.html> > Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ... > Spark Streaming + Kafka Integration Guide. Apache Kafka is > publish-subscribe messaging rethought as a distributed, partitioned, > replicated commit log service. > Read more... > <https://spark.apache.org/docs/latest/streaming-kafka-integration.html> > > Will I receive exactly one semantics? Or I have to add some logic in my > code to archive that. > As your suggestion of using delta update, exactly one semantic is required > for this application. > > 2/ For ad-hoc query, I must output of Spark to external storage and query > on that right? > Is there any way to do ah-hoc query on Spark? my application could have > 50k updates per second at pick time. > Persistent to external storage lead to high latency in my app. > > 3/ How to get real-time statistics from Spark, > In most of the Spark streaming examples, the statistics are echo to the > stdout. > However, I want to display those statics on GUI, is there any way to > retrieve data from Spark directly without using external Storage? > > > 2015-09-19 16:23 GMT+07:00 Jörn Franke <jornfra...@gmail.com > <javascript:_e(%7B%7D,'cvml','jornfra...@gmail.com');>>: > >> If you want to be able to let your users query their portfolio then you >> may want to think about storing the current state of the portfolios in >> hbase/phoenix or alternatively a cluster of relationaldatabases can make >> sense. For the rest you may use Spark. >> >> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê <thuyhang...@gmail.com >> <javascript:_e(%7B%7D,'cvml','thuyhang...@gmail.com');>> a écrit : >> >>> Hi all, >>> >>> I am going to build a financial application for Portfolio Manager, where >>> each portfolio contains a list of stocks, the number of shares purchased, >>> and the purchase price. >>> Another source of information is stocks price from market data. The >>> application need to calculate real-time
Re: Using Spark for portfolio manager app
1. reading from kafka has exactly once guarantees - we are using it in production today (with the direct receiver) * you will probably have 2 topics, loading both into spark and joining / unioning as needed is not an issue * tons of optimizations you can do there, assuming everything else works 2. for ad-hoc query I would say you absolutely need to look at external storage * querying the Dstream or spark's RDD's directly should be done mostly for aggregates/metrics, not by users * if you look at HBase or Cassandra for storage then 50k writes /sec are not a problem at all, especially combined with a smart client that does batch puts (like async hbase<https://github.com/OpenTSDB/asynchbase>) * you could also consider writing the updates to another kafka topic and have a different component that updates the DB, if you think of other optimisations there 3. by stats I assume you mean metrics (operational or business) * there are multiple ways to do this, however I would not encourage you to query spark directly, especially if you need an archive/history of your datapoints * we are using OpenTSDB (we already have a HBase cluster) + Grafana for dashboarding * collecting the metrics is a bit hairy in a streaming app - we have experimented with both accumulators and RDDs specific for metrics - chose the RDDs that write to OpenTSDB using foreachRdd -adrian From: Thúy Hằng Lê <thuyhang...@gmail.com> Sent: Sunday, September 20, 2015 7:26 AM To: Jörn Franke Cc: user@spark.apache.org Subject: Re: Using Spark for portfolio manager app Thanks Adrian and Jorn for the answers. Yes, you're right there are lot of things I need to consider if I want to use Spark for my app. I still have few concerns/questions from your information: 1/ I need to combine trading stream with tick stream, I am planning to use Kafka for that If I am using approach #2 (Direct Approach) in this tutorial https://spark.apache.org/docs/latest/streaming-kafka-integration.html [https://spark.apache.org/docs/latest/img/spark-logo-hd.png]<https://spark.apache.org/docs/latest/streaming-kafka-integration.html> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ... Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Read more...<https://spark.apache.org/docs/latest/streaming-kafka-integration.html> Will I receive exactly one semantics? Or I have to add some logic in my code to archive that. As your suggestion of using delta update, exactly one semantic is required for this application. 2/ For ad-hoc query, I must output of Spark to external storage and query on that right? Is there any way to do ah-hoc query on Spark? my application could have 50k updates per second at pick time. Persistent to external storage lead to high latency in my app. 3/ How to get real-time statistics from Spark, In most of the Spark streaming examples, the statistics are echo to the stdout. However, I want to display those statics on GUI, is there any way to retrieve data from Spark directly without using external Storage? 2015-09-19 16:23 GMT+07:00 Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>: If you want to be able to let your users query their portfolio then you may want to think about storing the current state of the portfolios in hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. For the rest you may use Spark. Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê <thuyhang...@gmail.com<mailto:thuyhang...@gmail.com>> a écrit : Hi all, I am going to build a financial application for Portfolio Manager, where each portfolio contains a list of stocks, the number of shares purchased, and the purchase price. Another source of information is stocks price from market data. The application need to calculate real-time gain or lost of each stock in each portfolio ( compared to the purchase price). I am new with Spark, i know using Spark Streaming I can aggregate portfolio possitions in real-time, for example: user A contains: - 100 IBM stock with transactionValue=$15000 - 500 AAPL stock with transactionValue=$11400 Now given the stock prices change in real-time too, e.g if IBM price at 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = $100 My questions are: * What is the best method to combine 2 real-time streams( transaction made by user and market pricing data) in Spark. * How can I use real-time Adhoc SQL again portfolio's positions, is there any way i can do SQL on the output of Spark Streamming. For example, select sum(gainOrLost) from portfolio where user='A'; * What are prefe
Re: Using Spark for portfolio manager app
I think generally the way forward would be to put aggregate statistics to an external storage (eg hbase) - it should not have that much influence on latency. You will probably need it anyway if you need to store historical information. Wrt to deltas - always a tricky topic. You may want to work with absolute values and when the application queries the external datastore then it calculates deltas. Once this works you can think if you still need to do the delta approach or not. Le dim. 20 sept. 2015 à 6:26, Thúy Hằng Lêa écrit : > Thanks Adrian and Jorn for the answers. > > Yes, you're right there are lot of things I need to consider if I want to > use Spark for my app. > > I still have few concerns/questions from your information: > > 1/ I need to combine trading stream with tick stream, I am planning to use > Kafka for that > If I am using approach #2 (Direct Approach) in this tutorial > https://spark.apache.org/docs/latest/streaming-kafka-integration.html > Will I receive exactly one semantics? Or I have to add some logic in my > code to archive that. > As your suggestion of using delta update, exactly one semantic is required > for this application. > > 2/ For ad-hoc query, I must output of Spark to external storage and query > on that right? > Is there any way to do ah-hoc query on Spark? my application could have > 50k updates per second at pick time. > Persistent to external storage lead to high latency in my app. > > 3/ How to get real-time statistics from Spark, > In most of the Spark streaming examples, the statistics are echo to the > stdout. > However, I want to display those statics on GUI, is there any way to > retrieve data from Spark directly without using external Storage? > > > 2015-09-19 16:23 GMT+07:00 Jörn Franke : > >> If you want to be able to let your users query their portfolio then you >> may want to think about storing the current state of the portfolios in >> hbase/phoenix or alternatively a cluster of relationaldatabases can make >> sense. For the rest you may use Spark. >> >> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a >> écrit : >> >>> Hi all, >>> >>> I am going to build a financial application for Portfolio Manager, where >>> each portfolio contains a list of stocks, the number of shares purchased, >>> and the purchase price. >>> Another source of information is stocks price from market data. The >>> application need to calculate real-time gain or lost of each stock in each >>> portfolio ( compared to the purchase price). >>> >>> I am new with Spark, i know using Spark Streaming I can aggregate >>> portfolio possitions in real-time, for example: >>> user A contains: >>> - 100 IBM stock with transactionValue=$15000 >>> - 500 AAPL stock with transactionValue=$11400 >>> >>> Now given the stock prices change in real-time too, e.g if IBM price at >>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - >>> 15000 = $100 >>> >>> My questions are: >>> >>> * What is the best method to combine 2 real-time streams( >>> transaction made by user and market pricing data) in Spark. >>> * How can I use real-time Adhoc SQL again >>> portfolio's positions, is there any way i can do SQL on the output of Spark >>> Streamming. >>> For example, >>> select sum(gainOrLost) from portfolio where user='A'; >>> * What are prefered external storages for Spark in this use >>> case. >>> * Is spark is right choice for my use case? >>> >>> >> >
Re: Using Spark for portfolio manager app
Hi Thuy, You can check Rdd.lookup(). It requires the rdd is partitioned, and of course, cached in memory. Or you may consider a distributed cache like ehcache, aws elastic cache. I think an external storage is an option, too. Especially nosql databases, they can handle updates at high speed, at constant time. Cheers, Huy. On Sun, Sep 20, 2015 at 11:26 AM Thúy Hằng Lêwrote: > Thanks Adrian and Jorn for the answers. > > Yes, you're right there are lot of things I need to consider if I want to > use Spark for my app. > > I still have few concerns/questions from your information: > > 1/ I need to combine trading stream with tick stream, I am planning to use > Kafka for that > If I am using approach #2 (Direct Approach) in this tutorial > https://spark.apache.org/docs/latest/streaming-kafka-integration.html > Will I receive exactly one semantics? Or I have to add some logic in my > code to archive that. > As your suggestion of using delta update, exactly one semantic is required > for this application. > > 2/ For ad-hoc query, I must output of Spark to external storage and query > on that right? > Is there any way to do ah-hoc query on Spark? my application could have > 50k updates per second at pick time. > Persistent to external storage lead to high latency in my app. > > 3/ How to get real-time statistics from Spark, > In most of the Spark streaming examples, the statistics are echo to the > stdout. > However, I want to display those statics on GUI, is there any way to > retrieve data from Spark directly without using external Storage? > > > 2015-09-19 16:23 GMT+07:00 Jörn Franke : > >> If you want to be able to let your users query their portfolio then you >> may want to think about storing the current state of the portfolios in >> hbase/phoenix or alternatively a cluster of relationaldatabases can make >> sense. For the rest you may use Spark. >> >> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a >> écrit : >> >>> Hi all, >>> >>> I am going to build a financial application for Portfolio Manager, where >>> each portfolio contains a list of stocks, the number of shares purchased, >>> and the purchase price. >>> Another source of information is stocks price from market data. The >>> application need to calculate real-time gain or lost of each stock in each >>> portfolio ( compared to the purchase price). >>> >>> I am new with Spark, i know using Spark Streaming I can aggregate >>> portfolio possitions in real-time, for example: >>> user A contains: >>> - 100 IBM stock with transactionValue=$15000 >>> - 500 AAPL stock with transactionValue=$11400 >>> >>> Now given the stock prices change in real-time too, e.g if IBM price at >>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - >>> 15000 = $100 >>> >>> My questions are: >>> >>> * What is the best method to combine 2 real-time streams( >>> transaction made by user and market pricing data) in Spark. >>> * How can I use real-time Adhoc SQL again >>> portfolio's positions, is there any way i can do SQL on the output of Spark >>> Streamming. >>> For example, >>> select sum(gainOrLost) from portfolio where user='A'; >>> * What are prefered external storages for Spark in this use >>> case. >>> * Is spark is right choice for my use case? >>> >>> >> >
Re: Using Spark for portfolio manager app
If you want to be able to let your users query their portfolio then you may want to think about storing the current state of the portfolios in hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. For the rest you may use Spark. Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lêa écrit : > Hi all, > > I am going to build a financial application for Portfolio Manager, where > each portfolio contains a list of stocks, the number of shares purchased, > and the purchase price. > Another source of information is stocks price from market data. The > application need to calculate real-time gain or lost of each stock in each > portfolio ( compared to the purchase price). > > I am new with Spark, i know using Spark Streaming I can aggregate > portfolio possitions in real-time, for example: > user A contains: > - 100 IBM stock with transactionValue=$15000 > - 500 AAPL stock with transactionValue=$11400 > > Now given the stock prices change in real-time too, e.g if IBM price at > 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - > 15000 = $100 > > My questions are: > > * What is the best method to combine 2 real-time streams( > transaction made by user and market pricing data) in Spark. > * How can I use real-time Adhoc SQL again portfolio's positions, > is there any way i can do SQL on the output of Spark Streamming. > For example, > select sum(gainOrLost) from portfolio where user='A'; > * What are prefered external storages for Spark in this use case. > * Is spark is right choice for my use case? > >
Re: Using Spark for portfolio manager app
Cool use case! You should definitely be able to model it with Spark. For the first question it's pretty easy - you probably need to keep the user portfolios as state using updateStateByKey. You need to consume 2 event sources - user trades and stock changes. You probably want to Cogroup the stock changes with users that have that stock in their portfolio, then union the 2 message streams. As messages come in, you consume the union of these 2 streams and you update the state. Messages modeled as case classes and a pattern match should do the trick (assuming scala). After the update, you need to emit a tuple with (newPortfolio, gainOrLost) so you can also push the deltas somewhere. For the Sql part, you need to create a Dataframe out of the user portfolio DStream, using foreachrdd. Look around for examples of Sql + spark streaming, I think databricks had a sample app / tutorial. You can then query the resulting DataFrame using SQL. If instead of one update you want to provide a graph then you need to use a window over the gainOrLose. That being said, there are a lot of interesting questions you'll need to answer about state keeping, event sourcing, persistance, durability - especially around outputting data out of spark, where you need to do more work to achieve exactly once semmantics. I only focused on the main dataflow. Hope this helps, that's how I'd model it, anyway :) -adrian Sent from my iPhone > On 19 Sep 2015, at 05:43, Thúy Hằng Lêwrote: > > Hi all, > > I am going to build a financial application for Portfolio Manager, where each > portfolio contains a list of stocks, the number of shares purchased, and the > purchase price. > Another source of information is stocks price from market data. The > application need to calculate real-time gain or lost of each stock in each > portfolio ( compared to the purchase price). > > I am new with Spark, i know using Spark Streaming I can aggregate portfolio > possitions in real-time, for example: > user A contains: > - 100 IBM stock with transactionValue=$15000 > - 500 AAPL stock with transactionValue=$11400 > > Now given the stock prices change in real-time too, e.g if IBM price at 151, > i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = > $100 > > My questions are: > > * What is the best method to combine 2 real-time streams( > transaction made by user and market pricing data) in Spark. > * How can I use real-time Adhoc SQL again portfolio's positions, is > there any way i can do SQL on the output of Spark Streamming. > For example, > select sum(gainOrLost) from portfolio where user='A'; > * What are prefered external storages for Spark in this use case. > * Is spark is right choice for my use case? >