Re: in joins, does one side stream?
Got it..thnx Reynold.. On 20 Sep 2015 07:08, "Reynold Xin" wrote: > The RDDs themselves are not materialized, but the implementations can > materialize. > > E.g. in cogroup (which is used by RDD.join), it materializes all the data > during grouping. > > In SQL/DataFrame join, depending on the join: > > 1. For broadcast join, only the smaller side is materialized in memory as > a hash table. > > 2. For sort-merge join, both sides are sorted & streamed through -- > however, one of the sides need to buffer all the rows having the same join > key in order to perform the join. > > > > On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra < > rishi80.mis...@gmail.com> wrote: > >> Hi Reynold, >> Can you please elaborate on this. I thought RDD also opens only an >> iterator. Does it get materialized for joins? >> >> Rishi >> >> On Saturday, September 19, 2015, Reynold Xin wrote: >> >>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side >>> streams. >>> >>> >>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers >>> wrote: >>> in scalding we join with the smaller side on the left, since the smaller side will get buffered while the bigger side streams through the join. looking at CoGroupedRDD i do not get the impression such a distiction is made. it seems both sided are put into a map that can spill to disk. is this correct? thanks >>> >>> >
Re: Using Spark for portfolio manager app
I think generally the way forward would be to put aggregate statistics to an external storage (eg hbase) - it should not have that much influence on latency. You will probably need it anyway if you need to store historical information. Wrt to deltas - always a tricky topic. You may want to work with absolute values and when the application queries the external datastore then it calculates deltas. Once this works you can think if you still need to do the delta approach or not. Le dim. 20 sept. 2015 à 6:26, Thúy Hằng Lê a écrit : > Thanks Adrian and Jorn for the answers. > > Yes, you're right there are lot of things I need to consider if I want to > use Spark for my app. > > I still have few concerns/questions from your information: > > 1/ I need to combine trading stream with tick stream, I am planning to use > Kafka for that > If I am using approach #2 (Direct Approach) in this tutorial > https://spark.apache.org/docs/latest/streaming-kafka-integration.html > Will I receive exactly one semantics? Or I have to add some logic in my > code to archive that. > As your suggestion of using delta update, exactly one semantic is required > for this application. > > 2/ For ad-hoc query, I must output of Spark to external storage and query > on that right? > Is there any way to do ah-hoc query on Spark? my application could have > 50k updates per second at pick time. > Persistent to external storage lead to high latency in my app. > > 3/ How to get real-time statistics from Spark, > In most of the Spark streaming examples, the statistics are echo to the > stdout. > However, I want to display those statics on GUI, is there any way to > retrieve data from Spark directly without using external Storage? > > > 2015-09-19 16:23 GMT+07:00 Jörn Franke : > >> If you want to be able to let your users query their portfolio then you >> may want to think about storing the current state of the portfolios in >> hbase/phoenix or alternatively a cluster of relationaldatabases can make >> sense. For the rest you may use Spark. >> >> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a >> écrit : >> >>> Hi all, >>> >>> I am going to build a financial application for Portfolio Manager, where >>> each portfolio contains a list of stocks, the number of shares purchased, >>> and the purchase price. >>> Another source of information is stocks price from market data. The >>> application need to calculate real-time gain or lost of each stock in each >>> portfolio ( compared to the purchase price). >>> >>> I am new with Spark, i know using Spark Streaming I can aggregate >>> portfolio possitions in real-time, for example: >>> user A contains: >>> - 100 IBM stock with transactionValue=$15000 >>> - 500 AAPL stock with transactionValue=$11400 >>> >>> Now given the stock prices change in real-time too, e.g if IBM price at >>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - >>> 15000 = $100 >>> >>> My questions are: >>> >>> * What is the best method to combine 2 real-time streams( >>> transaction made by user and market pricing data) in Spark. >>> * How can I use real-time Adhoc SQL again >>> portfolio's positions, is there any way i can do SQL on the output of Spark >>> Streamming. >>> For example, >>> select sum(gainOrLost) from portfolio where user='A'; >>> * What are prefered external storages for Spark in this use >>> case. >>> * Is spark is right choice for my use case? >>> >>> >> >
Re: Using Spark for portfolio manager app
Hi Thuy, You can check Rdd.lookup(). It requires the rdd is partitioned, and of course, cached in memory. Or you may consider a distributed cache like ehcache, aws elastic cache. I think an external storage is an option, too. Especially nosql databases, they can handle updates at high speed, at constant time. Cheers, Huy. On Sun, Sep 20, 2015 at 11:26 AM Thúy Hằng Lê wrote: > Thanks Adrian and Jorn for the answers. > > Yes, you're right there are lot of things I need to consider if I want to > use Spark for my app. > > I still have few concerns/questions from your information: > > 1/ I need to combine trading stream with tick stream, I am planning to use > Kafka for that > If I am using approach #2 (Direct Approach) in this tutorial > https://spark.apache.org/docs/latest/streaming-kafka-integration.html > Will I receive exactly one semantics? Or I have to add some logic in my > code to archive that. > As your suggestion of using delta update, exactly one semantic is required > for this application. > > 2/ For ad-hoc query, I must output of Spark to external storage and query > on that right? > Is there any way to do ah-hoc query on Spark? my application could have > 50k updates per second at pick time. > Persistent to external storage lead to high latency in my app. > > 3/ How to get real-time statistics from Spark, > In most of the Spark streaming examples, the statistics are echo to the > stdout. > However, I want to display those statics on GUI, is there any way to > retrieve data from Spark directly without using external Storage? > > > 2015-09-19 16:23 GMT+07:00 Jörn Franke : > >> If you want to be able to let your users query their portfolio then you >> may want to think about storing the current state of the portfolios in >> hbase/phoenix or alternatively a cluster of relationaldatabases can make >> sense. For the rest you may use Spark. >> >> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a >> écrit : >> >>> Hi all, >>> >>> I am going to build a financial application for Portfolio Manager, where >>> each portfolio contains a list of stocks, the number of shares purchased, >>> and the purchase price. >>> Another source of information is stocks price from market data. The >>> application need to calculate real-time gain or lost of each stock in each >>> portfolio ( compared to the purchase price). >>> >>> I am new with Spark, i know using Spark Streaming I can aggregate >>> portfolio possitions in real-time, for example: >>> user A contains: >>> - 100 IBM stock with transactionValue=$15000 >>> - 500 AAPL stock with transactionValue=$11400 >>> >>> Now given the stock prices change in real-time too, e.g if IBM price at >>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - >>> 15000 = $100 >>> >>> My questions are: >>> >>> * What is the best method to combine 2 real-time streams( >>> transaction made by user and market pricing data) in Spark. >>> * How can I use real-time Adhoc SQL again >>> portfolio's positions, is there any way i can do SQL on the output of Spark >>> Streamming. >>> For example, >>> select sum(gainOrLost) from portfolio where user='A'; >>> * What are prefered external storages for Spark in this use >>> case. >>> * Is spark is right choice for my use case? >>> >>> >> >
Re: question building spark in a virtual machine
I allocated almost 6GB of RAM to the ubuntu virtual machine and got the same problem. I will go over this post and try to zoom in into the java vm settings. meanwhile - can someone with a working ubuntu machine can specify her JVM settings? Thanks, Eyal On Sat, Sep 19, 2015 at 7:49 PM, Ted Yu wrote: > Please read this article: > > http://blogs.vmware.com/apps/2011/06/taking-a-closer-look-at-sizing-the-java-process.html > > Can you increase the memory given to the ubuntu virtual machine ? > > Cheers > > On Sat, Sep 19, 2015 at 9:30 AM, Eyal Altshuler > wrote: > >> Hi, >> >> I allocate 4GB for the ubuntu virtual machine, how to check what is the >> maximal available for a jvm process? >> Regarding the thread - I see it's related to building on windows. >> >> Thanks, >> Eyal >> >> On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu wrote: >> >>> See also this thread: >>> >>> https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/ >>> >>> Cheers >>> >>> On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar < >>> aniket.bhatna...@gmail.com> wrote: >>> Hi Eval Can you check if your Ubuntu VM has enough RAM allocated to run JVM of size 3gb? thanks, Aniket On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler wrote: > Hi, > > I had configured the MAVEN_OPTS environment variable the same as you > wrote. > My java version is 1.7.0_75. > I didn't customized the JVM heap size specifically. Is there an > additional configuration I have to run besides the MAVEN_OPTS > configutaion? > > Thanks, > Eyal > > On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: > >> Can you tell us how you configured the JVM heap size ? >> Which version of Java are you using ? >> >> When I build Spark, I do the following: >> >> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M >> -XX:ReservedCodeCacheSize=512m" >> >> Cheers >> >> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler < >> eyal.altshu...@gmail.com> wrote: >> >>> Hi, >>> Trying to build spark in my ubuntu virtual machine, I am getting the >>> following error: >>> >>> "Error occurred during initialization of VM >>> Could not reserve enough space for object heap >>> Error: could not create the Java Virtual Machine. >>> Error: A fatal exception has occurred. Program will exit". >>> >>> I have configured the JVM heap size correctly. >>> >>> How can I fix it? >>> >>> Thanks, >>> Eyal >>> >> >> > >>> >> >
DataGenerator for streaming application
Hi, I am trying to build a data generator that feeds a streaming application. This data generator just reads a file and send its lines through a socket. I get no errors on the logs, and the benchmark bellow always prints "Received 0 records". Am I doing something wrong? object MyDataGenerator { def main(args: Array[String]) { if (args.length != 3) { System.err.println("Usage: RawTextSender ") System.exit(1) } // Parse the arguments using a pattern match val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt) val serverSocket = new ServerSocket(port) println("Listening on port " + port) while (true) { val socket = serverSocket.accept() println("Got a new connection") val out = new PrintWriter(socket.getOutputStream) try { var count = 0 var startTimestamp = -1 for (line <- Source.fromFile(file).getLines()) { val ts = line.substring(2, line.indexOf(',',2)).toInt if(startTimestamp < 0) startTimestamp = ts if(ts - startTimestamp <= 30) { out.println(line) count += 1 } else { println(s"Emmited reports: $count") count = 0 out.flush() startTimestamp = ts Thread.sleep(sleepMillis) } } } catch { case e: IOException => println("Client disconnected") socket.close() } } } } object Benchmark { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: RawNetworkGrep ") System.exit(1) } val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), args(2).toInt, args(3).toInt) val sparkConf = new SparkConf() sparkConf.setAppName("BenchMark") sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar")) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ") if (sparkConf.getOption("spark.master") == None) { // Master not set, as this was not launched through Spark-submit. Setting master as local." sparkConf.setMaster("local[*]") } // Create the context val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.count().map(c => s"Received $c records").print() ssc.start() ssc.awaitTermination() } } Thanks.
Re: Using Spark for portfolio manager app
Thanks Adrian and Jorn for the answers. Yes, you're right there are lot of things I need to consider if I want to use Spark for my app. I still have few concerns/questions from your information: 1/ I need to combine trading stream with tick stream, I am planning to use Kafka for that If I am using approach #2 (Direct Approach) in this tutorial https://spark.apache.org/docs/latest/streaming-kafka-integration.html Will I receive exactly one semantics? Or I have to add some logic in my code to archive that. As your suggestion of using delta update, exactly one semantic is required for this application. 2/ For ad-hoc query, I must output of Spark to external storage and query on that right? Is there any way to do ah-hoc query on Spark? my application could have 50k updates per second at pick time. Persistent to external storage lead to high latency in my app. 3/ How to get real-time statistics from Spark, In most of the Spark streaming examples, the statistics are echo to the stdout. However, I want to display those statics on GUI, is there any way to retrieve data from Spark directly without using external Storage? 2015-09-19 16:23 GMT+07:00 Jörn Franke : > If you want to be able to let your users query their portfolio then you > may want to think about storing the current state of the portfolios in > hbase/phoenix or alternatively a cluster of relationaldatabases can make > sense. For the rest you may use Spark. > > Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a > écrit : > >> Hi all, >> >> I am going to build a financial application for Portfolio Manager, where >> each portfolio contains a list of stocks, the number of shares purchased, >> and the purchase price. >> Another source of information is stocks price from market data. The >> application need to calculate real-time gain or lost of each stock in each >> portfolio ( compared to the purchase price). >> >> I am new with Spark, i know using Spark Streaming I can aggregate >> portfolio possitions in real-time, for example: >> user A contains: >> - 100 IBM stock with transactionValue=$15000 >> - 500 AAPL stock with transactionValue=$11400 >> >> Now given the stock prices change in real-time too, e.g if IBM price at >> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - >> 15000 = $100 >> >> My questions are: >> >> * What is the best method to combine 2 real-time streams( >> transaction made by user and market pricing data) in Spark. >> * How can I use real-time Adhoc SQL again portfolio's positions, >> is there any way i can do SQL on the output of Spark Streamming. >> For example, >> select sum(gainOrLost) from portfolio where user='A'; >> * What are prefered external storages for Spark in this use case. >> * Is spark is right choice for my use case? >> >> >
Re: PrunedFilteredScan does not work for UDTs and Struct fields
Hi Richard, I am not sure how to support user-defined type. But regarding your second question, you can have a walkaround as following. Suppose you have a struct a, and want to filter a.c with a.c > X. You can define a alias C as a.c, and add extra column C to the schema of the relation, and your query would be C > X instead of a.c > X. In this way, in the buildScan you would have GreaterThan(C, X). You then can programmatically convert C to a.c. Note that in the buildScan required columns would also have an extra column C you need to returned in the buildScan RDD. It looks complicated, but I think it would work. Thanks. Zhan Zhang From: Richard Eggert Sent: Saturday, September 19, 2015 3:59 PM To: User Subject: PrunedFilteredScan does not work for UDTs and Struct fields I defined my own relation (extending BaseRelation) and implemented the PrunedFilteredScan interface, but discovered that if the column referenced in a WHERE = clause is a user-defined type or a field of a struct column, then Spark SQL passes NO filters to the PrunedFilteredScan.buildScan method, rendering the interface useless. Is there really no way to implement a relation to optimize on such fields? -- Rich
Re: in joins, does one side stream?
The RDDs themselves are not materialized, but the implementations can materialize. E.g. in cogroup (which is used by RDD.join), it materializes all the data during grouping. In SQL/DataFrame join, depending on the join: 1. For broadcast join, only the smaller side is materialized in memory as a hash table. 2. For sort-merge join, both sides are sorted & streamed through -- however, one of the sides need to buffer all the rows having the same join key in order to perform the join. On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra wrote: > Hi Reynold, > Can you please elaborate on this. I thought RDD also opens only an > iterator. Does it get materialized for joins? > > Rishi > > On Saturday, September 19, 2015, Reynold Xin wrote: > >> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side >> streams. >> >> >> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers >> wrote: >> >>> in scalding we join with the smaller side on the left, since the smaller >>> side will get buffered while the bigger side streams through the join. >>> >>> looking at CoGroupedRDD i do not get the impression such a distiction is >>> made. it seems both sided are put into a map that can spill to disk. is >>> this correct? >>> >>> thanks >>> >> >>
PrunedFilteredScan does not work for UDTs and Struct fields
I defined my own relation (extending BaseRelation) and implemented the PrunedFilteredScan interface, but discovered that if the column referenced in a WHERE = clause is a user-defined type or a field of a struct column, then Spark SQL passes NO filters to the PrunedFilteredScan.buildScan method, rendering the interface useless. Is there really no way to implement a relation to optimize on such fields? -- Rich
Re: Unable to see my kafka spark streaming output
Hi All, figured it out for got mention local as loca[2] , at least two node required. package com.examples /** * Created by kalit_000 on 19/09/2015. */ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkStreamingKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaStreaming").set("spark.executor.memory", "1g") val sc=new SparkContext(conf) val ssc= new StreamingContext(sc,Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) val lines=lineMap.map(_._2) lines.print //lines.print() //val words=lines.flatMap(_.split(" ")) // val pair=words.map( x => (x,1)) //val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2) //wordcount.print //ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint") ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder") //C:\scalatutorials\sparkstreaming_checkpoint_folder ssc.start() ssc.awaitTermination() } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750p24751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to see my kafka spark streaming output
Hi All, I am unable to see the output getting printed in the console can anyone help. package com.examples /** * Created by kalit_000 on 19/09/2015. */ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkStreamingKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("KafkaStreaming").set("spark.executor.memory", "1g") val sc=new SparkContext(conf) val ssc= new StreamingContext(sc,Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) val lines=lineMap.map(_._2) //lines.print() val words=lines.flatMap(_.split(" ")) val pair=words.map( x => (x,1)) val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2) wordcount.print //ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint") ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder") //C:\scalatutorials\sparkstreaming_checkpoint_folder ssc.start() ssc.awaitTermination() } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka createDirectStream issue
Hi , I am trying to develop in intellij Idea same code I am having the same issue is there any work around. Error in intellij:- cannot resolve symbol createDirectStream import kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.kafka._ import com.datastax.spark.connector.streaming._ import org.apache.spark.streaming.kafka._ object SparkKafkaOffsetTest { def main(args: Array[String]): Unit = { //Logger.getLogger("org").setLevel(Level.WARN) //Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val broker="localhost:9091" val kafkaParams = Map[String, String]("metadata.broker.list" -> broker) //val kafkaParams = Map[String, String]("metadata.broker.list" ) val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap //val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) //val directKafkaStream=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val messages= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //val directKafkaStream = KafkaUtils.createDirectStream[ //[key class], [value class], [key decoder class], [value decoder class] ]( //streamingContext, [map of Kafka parameters], [set of topics to consume]) } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: in joins, does one side stream?
Hi Reynold, Can you please elaborate on this. I thought RDD also opens only an iterator. Does it get materialized for joins? Rishi On Saturday, September 19, 2015, Reynold Xin wrote: > Yes for RDD -- both are materialized. No for DataFrame/SQL - one side > streams. > > > On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers > wrote: > >> in scalding we join with the smaller side on the left, since the smaller >> side will get buffered while the bigger side streams through the join. >> >> looking at CoGroupedRDD i do not get the impression such a distiction is >> made. it seems both sided are put into a map that can spill to disk. is >> this correct? >> >> thanks >> > >
Re: Spark on Mesos with Jobs in Cluster Mode Documentation
You can still provide properties through the docker container by putting configuration in the conf directory, but we try to pass all properties submitted from the driver spark-submit through which I believe will override the defaults. This is not what you are seeing? Tim > On Sep 19, 2015, at 9:01 AM, Alan Braithwaite wrote: > > The assumption that the executor has no default properties set in it's > environment through the docker container. Correct me if I'm wrong, but any > properties which are unset in the SparkContext will come from the environment > of the executor will it not? > > Thanks, > - Alan > >> On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen wrote: >> I guess I need a bit more clarification, what kind of assumptions was the >> dispatcher making? >> >> Tim >> >> >>> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite >>> wrote: >>> Hi Tim, >>> >>> Thanks for the follow up. It's not so much that I expect the executor to >>> inherit the configuration of the dispatcher as I don't expect the >>> dispatcher to make assumptions about the system environment of the executor >>> (since it lives in a docker). I could potentially see a case where you >>> might want to explicitly forbid the defaults, but I can't think of any >>> right now. >>> >>> Otherwise, I'm confused as to why the defaults in the docker image for the >>> executor are just ignored. I suppose that it's the dispatchers job to >>> ensure the exact configuration of the executor, regardless of the defaults >>> set on the executors machine? Is that the assumption being made? I can >>> understand that in contexts which aren't docker driven since jobs could be >>> rolling out in the middle of a config update. Trying to think of this >>> outside the terms of just mesos/docker (since I'm fully aware that docker >>> doesn't rule the world yet). >>> >>> So I can see this from both perspectives now and passing in the properties >>> file will probably work just fine for me, but for my better understanding: >>> When the executor starts, will it read any of the environment that it's >>> executing in or will it just take only the properties given to it by the >>> dispatcher and nothing more? >>> >>> Lemme know if anything needs more clarification and thanks for your mesos >>> contribution to spark! >>> >>> - Alan >>> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen wrote: Hi Alan, If I understand correctly, you are setting executor home when you launch the dispatcher and not on the configuration when you submit job, and expect it to inherit that configuration? When I worked on the dispatcher I was assuming all configuration is passed to the dispatcher to launch the job exactly how you will need to launch it with client mode. But indeed it shouldn't crash dispatcher, I'll take a closer look when I get a chance. Can you recommend changes on the documentation, either in email or a PR? Thanks! Tim Sent from my iPhone > On Sep 17, 2015, at 12:29 PM, Alan Braithwaite > wrote: > > Hey All, > > To bump this thread once again, I'm having some trouble using the > dispatcher as well. > > I'm using Mesos Cluster Manager with Docker Executors. I've deployed the > dispatcher as Marathon job. When I submit a job using spark submit, the > dispatcher writes back that the submission was successful and then > promptly dies in marathon. Looking at the logs reveals it was hitting > the following line: > > 398: throw new SparkException("Executor Spark home > `spark.mesos.executor.home` is not set!") > > Which is odd because it's set in multiple places (SPARK_HOME, > spark.mesos.executor.home, spark.home, etc). Reading the code, it > appears that the driver desc pulls only from the request and disregards > any other properties that may be configured. Testing by passing --conf > spark.mesos.executor.home=/usr/local/spark on the command line to > spark-submit confirms this. We're trying to isolate the number of places > where we have to set properties within spark and were hoping that it will > be possible to have this pull in the spark-defaults.conf from somewhere, > or at least allow the user to inform the dispatcher through spark-submit > that those properties will be available once the job starts. > > Finally, I don't think the dispatcher should crash in this event. It > seems not exceptional that a job is misconfigured when submitted. > > Please direct me on the right path if I'm headed in the wrong direction. > Also let me know if I should open some tickets for these issues. > > Thanks, > - Alan > >> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen wrote: >> Yes you can create an issue, or actually contribute a patch
Re: question building spark in a virtual machine
Please read this article: http://blogs.vmware.com/apps/2011/06/taking-a-closer-look-at-sizing-the-java-process.html Can you increase the memory given to the ubuntu virtual machine ? Cheers On Sat, Sep 19, 2015 at 9:30 AM, Eyal Altshuler wrote: > Hi, > > I allocate 4GB for the ubuntu virtual machine, how to check what is the > maximal available for a jvm process? > Regarding the thread - I see it's related to building on windows. > > Thanks, > Eyal > > On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu wrote: > >> See also this thread: >> >> https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/ >> >> Cheers >> >> On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar < >> aniket.bhatna...@gmail.com> wrote: >> >>> Hi Eval >>> >>> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of >>> size 3gb? >>> >>> thanks, >>> Aniket >>> >>> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler >>> wrote: >>> Hi, I had configured the MAVEN_OPTS environment variable the same as you wrote. My java version is 1.7.0_75. I didn't customized the JVM heap size specifically. Is there an additional configuration I have to run besides the MAVEN_OPTS configutaion? Thanks, Eyal On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: > Can you tell us how you configured the JVM heap size ? > Which version of Java are you using ? > > When I build Spark, I do the following: > > export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M > -XX:ReservedCodeCacheSize=512m" > > Cheers > > On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler < > eyal.altshu...@gmail.com> wrote: > >> Hi, >> Trying to build spark in my ubuntu virtual machine, I am getting the >> following error: >> >> "Error occurred during initialization of VM >> Could not reserve enough space for object heap >> Error: could not create the Java Virtual Machine. >> Error: A fatal exception has occurred. Program will exit". >> >> I have configured the JVM heap size correctly. >> >> How can I fix it? >> >> Thanks, >> Eyal >> > > >> >
Re: question building spark in a virtual machine
Hi, I allocate 4GB for the ubuntu virtual machine, how to check what is the maximal available for a jvm process? Regarding the thread - I see it's related to building on windows. Thanks, Eyal On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu wrote: > See also this thread: > > https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/ > > Cheers > > On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > >> Hi Eval >> >> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of >> size 3gb? >> >> thanks, >> Aniket >> >> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler >> wrote: >> >>> Hi, >>> >>> I had configured the MAVEN_OPTS environment variable the same as you >>> wrote. >>> My java version is 1.7.0_75. >>> I didn't customized the JVM heap size specifically. Is there an >>> additional configuration I have to run besides the MAVEN_OPTS configutaion? >>> >>> Thanks, >>> Eyal >>> >>> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: >>> Can you tell us how you configured the JVM heap size ? Which version of Java are you using ? When I build Spark, I do the following: export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" Cheers On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler < eyal.altshu...@gmail.com> wrote: > Hi, > Trying to build spark in my ubuntu virtual machine, I am getting the > following error: > > "Error occurred during initialization of VM > Could not reserve enough space for object heap > Error: could not create the Java Virtual Machine. > Error: A fatal exception has occurred. Program will exit". > > I have configured the JVM heap size correctly. > > How can I fix it? > > Thanks, > Eyal > >>> >
Re: word count (group by users) in spark
Using scala API, you can first group by user and then use combineByKey. Thanks, Aniket On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com wrote: > Hi All, > I would like to achieve this below output using spark , I managed to write > in Hive and call it in spark but not in just spark (scala), how to group > word counts on particular user (column) for example. > Imagine users and their given tweets I want to do word count based on user > name. > > Input:- > kaliA,B,A,B,B > james B,A,A,A,B > > Output:- > kali A [Count] B [Count] > James A [Count] B [Count] > > My Hive Answer:- > CREATE EXTERNAL TABLE TEST > ( > user_name string , > COMMENTS STRING > > ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE > LOCATION '/data/kali/test'; HDFS FOLDER (create hdfs folder and > create a text file with data mentioned in the email) > > use default;select user_name,COLLECT_SET(text) from (select > user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW > explode(split(comments,',')) subView AS sub group by user_name,sub)w group > by user_name; > > Spark With Hive:- > package com.examples > > /** > * Created by kalit_000 on 17/09/2015. > */ > import org.apache.log4j.Logger > import org.apache.log4j.Level > import org.apache.spark.sql.SQLContext > import org.apache.spark.sql.hive.HiveContext > import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.SparkContext._ > > > object HiveWordCount { > > def main(args: Array[String]): Unit = > { > Logger.getLogger("org").setLevel(Level.WARN) > Logger.getLogger("akka").setLevel(Level.WARN) > > val conf = new > > SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", > "1g") > val sc = new SparkContext(conf) > val sqlContext= new SQLContext(sc) > > val hc=new HiveContext(sc) > > hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name > string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' > STORED AS TEXTFILE LOCATION '/data/kali/test' ") > > val op=hc.sql("select user_name,COLLECT_SET(text) from (select > user_name,concat(sub,' ',count(comments)) as text from default.test > LATERAL > VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w > group by user_name") > > op.collect.foreach(println) > > > } > > > > > Thanks > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark on Mesos with Jobs in Cluster Mode Documentation
The assumption that the executor has no default properties set in it's environment through the docker container. Correct me if I'm wrong, but any properties which are unset in the SparkContext will come from the environment of the executor will it not? Thanks, - Alan On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen wrote: > I guess I need a bit more clarification, what kind of assumptions was the > dispatcher making? > > Tim > > > On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite > wrote: > >> Hi Tim, >> >> Thanks for the follow up. It's not so much that I expect the executor to >> inherit the configuration of the dispatcher as I* don't *expect the >> dispatcher to make assumptions about the system environment of the executor >> (since it lives in a docker). I could potentially see a case where you >> might want to explicitly forbid the defaults, but I can't think of any >> right now. >> >> Otherwise, I'm confused as to why the defaults in the docker image for >> the executor are just ignored. I suppose that it's the dispatchers job to >> ensure the *exact* configuration of the executor, regardless of the >> defaults set on the executors machine? Is that the assumption being made? >> I can understand that in contexts which aren't docker driven since jobs >> could be rolling out in the middle of a config update. Trying to think of >> this outside the terms of just mesos/docker (since I'm fully aware that >> docker doesn't rule the world yet). >> >> So I can see this from both perspectives now and passing in the >> properties file will probably work just fine for me, but for my better >> understanding: When the executor starts, will it read any of the >> environment that it's executing in or will it just take only the properties >> given to it by the dispatcher and nothing more? >> >> Lemme know if anything needs more clarification and thanks for your mesos >> contribution to spark! >> >> - Alan >> >> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen wrote: >> >>> Hi Alan, >>> >>> If I understand correctly, you are setting executor home when you launch >>> the dispatcher and not on the configuration when you submit job, and expect >>> it to inherit that configuration? >>> >>> When I worked on the dispatcher I was assuming all configuration is >>> passed to the dispatcher to launch the job exactly how you will need to >>> launch it with client mode. >>> >>> But indeed it shouldn't crash dispatcher, I'll take a closer look when I >>> get a chance. >>> >>> Can you recommend changes on the documentation, either in email or a PR? >>> >>> Thanks! >>> >>> Tim >>> >>> Sent from my iPhone >>> >>> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite >>> wrote: >>> >>> Hey All, >>> >>> To bump this thread once again, I'm having some trouble using the >>> dispatcher as well. >>> >>> I'm using Mesos Cluster Manager with Docker Executors. I've deployed >>> the dispatcher as Marathon job. When I submit a job using spark submit, >>> the dispatcher writes back that the submission was successful and then >>> promptly dies in marathon. Looking at the logs reveals it was hitting the >>> following line: >>> >>> 398: throw new SparkException("Executor Spark home >>> `spark.mesos.executor.home` is not set!") >>> >>> Which is odd because it's set in multiple places (SPARK_HOME, >>> spark.mesos.executor.home, spark.home, etc). Reading the code, it >>> appears that the driver desc pulls only from the request and disregards any >>> other properties that may be configured. Testing by passing --conf >>> spark.mesos.executor.home=/usr/local/spark on the command line to >>> spark-submit confirms this. We're trying to isolate the number of places >>> where we have to set properties within spark and were hoping that it will >>> be possible to have this pull in the spark-defaults.conf from somewhere, or >>> at least allow the user to inform the dispatcher through spark-submit that >>> those properties will be available once the job starts. >>> >>> Finally, I don't think the dispatcher should crash in this event. It >>> seems not exceptional that a job is misconfigured when submitted. >>> >>> Please direct me on the right path if I'm headed in the wrong >>> direction. Also let me know if I should open some tickets for these issues. >>> >>> Thanks, >>> - Alan >>> >>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen wrote: >>> Yes you can create an issue, or actually contribute a patch to update it :) Sorry the docs is a bit light, I'm going to make it more complete along the way. Tim On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) < tomwa...@cisco.com> wrote: > Tim, > > Thank you for the explanation. You are correct, my Mesos experience > is very light, and I haven’t deployed anything via Marathon yet. What you > have stated here makes sense, I will look into doing this. > > Adding this info to the docs would be great. Is the appropri
Re: question building spark in a virtual machine
See also this thread: https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/ Cheers On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > Hi Eval > > Can you check if your Ubuntu VM has enough RAM allocated to run JVM of > size 3gb? > > thanks, > Aniket > > On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler > wrote: > >> Hi, >> >> I had configured the MAVEN_OPTS environment variable the same as you >> wrote. >> My java version is 1.7.0_75. >> I didn't customized the JVM heap size specifically. Is there an >> additional configuration I have to run besides the MAVEN_OPTS configutaion? >> >> Thanks, >> Eyal >> >> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: >> >>> Can you tell us how you configured the JVM heap size ? >>> Which version of Java are you using ? >>> >>> When I build Spark, I do the following: >>> >>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M >>> -XX:ReservedCodeCacheSize=512m" >>> >>> Cheers >>> >>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler < >>> eyal.altshu...@gmail.com> wrote: >>> Hi, Trying to build spark in my ubuntu virtual machine, I am getting the following error: "Error occurred during initialization of VM Could not reserve enough space for object heap Error: could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit". I have configured the JVM heap size correctly. How can I fix it? Thanks, Eyal >>> >>> >>
Re: question building spark in a virtual machine
Hi Eval Can you check if your Ubuntu VM has enough RAM allocated to run JVM of size 3gb? thanks, Aniket On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler wrote: > Hi, > > I had configured the MAVEN_OPTS environment variable the same as you wrote. > My java version is 1.7.0_75. > I didn't customized the JVM heap size specifically. Is there an additional > configuration I have to run besides the MAVEN_OPTS configutaion? > > Thanks, > Eyal > > On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: > >> Can you tell us how you configured the JVM heap size ? >> Which version of Java are you using ? >> >> When I build Spark, I do the following: >> >> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M >> -XX:ReservedCodeCacheSize=512m" >> >> Cheers >> >> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler > > wrote: >> >>> Hi, >>> Trying to build spark in my ubuntu virtual machine, I am getting the >>> following error: >>> >>> "Error occurred during initialization of VM >>> Could not reserve enough space for object heap >>> Error: could not create the Java Virtual Machine. >>> Error: A fatal exception has occurred. Program will exit". >>> >>> I have configured the JVM heap size correctly. >>> >>> How can I fix it? >>> >>> Thanks, >>> Eyal >>> >> >> >
Re: question building spark in a virtual machine
Hi, I had configured the MAVEN_OPTS environment variable the same as you wrote. My java version is 1.7.0_75. I didn't customized the JVM heap size specifically. Is there an additional configuration I have to run besides the MAVEN_OPTS configutaion? Thanks, Eyal On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu wrote: > Can you tell us how you configured the JVM heap size ? > Which version of Java are you using ? > > When I build Spark, I do the following: > > export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M > -XX:ReservedCodeCacheSize=512m" > > Cheers > > On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler > wrote: > >> Hi, >> Trying to build spark in my ubuntu virtual machine, I am getting the >> following error: >> >> "Error occurred during initialization of VM >> Could not reserve enough space for object heap >> Error: could not create the Java Virtual Machine. >> Error: A fatal exception has occurred. Program will exit". >> >> I have configured the JVM heap size correctly. >> >> How can I fix it? >> >> Thanks, >> Eyal >> > >
Re: question building spark in a virtual machine
Can you tell us how you configured the JVM heap size ? Which version of Java are you using ? When I build Spark, I do the following: export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" Cheers On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler wrote: > Hi, > Trying to build spark in my ubuntu virtual machine, I am getting the > following error: > > "Error occurred during initialization of VM > Could not reserve enough space for object heap > Error: could not create the Java Virtual Machine. > Error: A fatal exception has occurred. Program will exit". > > I have configured the JVM heap size correctly. > > How can I fix it? > > Thanks, > Eyal >
word count (group by users) in spark
Hi All, I would like to achieve this below output using spark , I managed to write in Hive and call it in spark but not in just spark (scala), how to group word counts on particular user (column) for example. Imagine users and their given tweets I want to do word count based on user name. Input:- kaliA,B,A,B,B james B,A,A,A,B Output:- kali A [Count] B [Count] James A [Count] B [Count] My Hive Answer:- CREATE EXTERNAL TABLE TEST ( user_name string , COMMENTS STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE LOCATION '/data/kali/test'; HDFS FOLDER (create hdfs folder and create a text file with data mentioned in the email) use default;select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name; Spark With Hive:- package com.examples /** * Created by kalit_000 on 17/09/2015. */ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object HiveWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val hc=new HiveContext(sc) hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' STORED AS TEXTFILE LOCATION '/data/kali/test' ") val op=hc.sql("select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from default.test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name") op.collect.foreach(println) } Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Docker/Mesos with Spark
I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and just found you CAN run it this way. Are there any user posts, blog posts, etc on why and how you'd do this? Basically, at first I was questioning why you'd run spark in a docker container, i.e., if you run with tar balled executor, what are you really gaining? And in this setup, are you losing out on performance somehow? (I am guessing smarter people than I have figured that out). Then I came along a situation where I wanted to use a python library with spark, and it had to be installed on every node, and I realized one big advantage of dockerized spark would be that spark apps that needed other libraries could be contained and built well. OK, that's huge, let's do that. For my next question there are lot of "questions" have on how this actually works. Does Clustermode/client mode apply here? If so, how? Is there a good walk through on getting this setup? Limitations? Gotchas? Should I just dive in an start working with it? Has anyone done any stories/rough documentation? This seems like a really helpful feature to scaling out spark, and letting developers truly build what they need without tons of admin overhead, so I really want to explore. Thanks! John
question building spark in a virtual machine
Hi, Trying to build spark in my ubuntu virtual machine, I am getting the following error: "Error occurred during initialization of VM Could not reserve enough space for object heap Error: could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit". I have configured the JVM heap size correctly. How can I fix it? Thanks, Eyal
Re: Using Spark for portfolio manager app
If you want to be able to let your users query their portfolio then you may want to think about storing the current state of the portfolios in hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. For the rest you may use Spark. Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê a écrit : > Hi all, > > I am going to build a financial application for Portfolio Manager, where > each portfolio contains a list of stocks, the number of shares purchased, > and the purchase price. > Another source of information is stocks price from market data. The > application need to calculate real-time gain or lost of each stock in each > portfolio ( compared to the purchase price). > > I am new with Spark, i know using Spark Streaming I can aggregate > portfolio possitions in real-time, for example: > user A contains: > - 100 IBM stock with transactionValue=$15000 > - 500 AAPL stock with transactionValue=$11400 > > Now given the stock prices change in real-time too, e.g if IBM price at > 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - > 15000 = $100 > > My questions are: > > * What is the best method to combine 2 real-time streams( > transaction made by user and market pricing data) in Spark. > * How can I use real-time Adhoc SQL again portfolio's positions, > is there any way i can do SQL on the output of Spark Streamming. > For example, > select sum(gainOrLost) from portfolio where user='A'; > * What are prefered external storages for Spark in this use case. > * Is spark is right choice for my use case? > >
Re: Spark on Mesos with Jobs in Cluster Mode Documentation
I guess I need a bit more clarification, what kind of assumptions was the dispatcher making? Tim On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite wrote: > Hi Tim, > > Thanks for the follow up. It's not so much that I expect the executor to > inherit the configuration of the dispatcher as I* don't *expect the > dispatcher to make assumptions about the system environment of the executor > (since it lives in a docker). I could potentially see a case where you > might want to explicitly forbid the defaults, but I can't think of any > right now. > > Otherwise, I'm confused as to why the defaults in the docker image for the > executor are just ignored. I suppose that it's the dispatchers job to > ensure the *exact* configuration of the executor, regardless of the > defaults set on the executors machine? Is that the assumption being made? > I can understand that in contexts which aren't docker driven since jobs > could be rolling out in the middle of a config update. Trying to think of > this outside the terms of just mesos/docker (since I'm fully aware that > docker doesn't rule the world yet). > > So I can see this from both perspectives now and passing in the properties > file will probably work just fine for me, but for my better understanding: > When the executor starts, will it read any of the environment that it's > executing in or will it just take only the properties given to it by the > dispatcher and nothing more? > > Lemme know if anything needs more clarification and thanks for your mesos > contribution to spark! > > - Alan > > On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen wrote: > >> Hi Alan, >> >> If I understand correctly, you are setting executor home when you launch >> the dispatcher and not on the configuration when you submit job, and expect >> it to inherit that configuration? >> >> When I worked on the dispatcher I was assuming all configuration is >> passed to the dispatcher to launch the job exactly how you will need to >> launch it with client mode. >> >> But indeed it shouldn't crash dispatcher, I'll take a closer look when I >> get a chance. >> >> Can you recommend changes on the documentation, either in email or a PR? >> >> Thanks! >> >> Tim >> >> Sent from my iPhone >> >> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite >> wrote: >> >> Hey All, >> >> To bump this thread once again, I'm having some trouble using the >> dispatcher as well. >> >> I'm using Mesos Cluster Manager with Docker Executors. I've deployed the >> dispatcher as Marathon job. When I submit a job using spark submit, the >> dispatcher writes back that the submission was successful and then promptly >> dies in marathon. Looking at the logs reveals it was hitting the following >> line: >> >> 398: throw new SparkException("Executor Spark home >> `spark.mesos.executor.home` is not set!") >> >> Which is odd because it's set in multiple places (SPARK_HOME, >> spark.mesos.executor.home, spark.home, etc). Reading the code, it >> appears that the driver desc pulls only from the request and disregards any >> other properties that may be configured. Testing by passing --conf >> spark.mesos.executor.home=/usr/local/spark on the command line to >> spark-submit confirms this. We're trying to isolate the number of places >> where we have to set properties within spark and were hoping that it will >> be possible to have this pull in the spark-defaults.conf from somewhere, or >> at least allow the user to inform the dispatcher through spark-submit that >> those properties will be available once the job starts. >> >> Finally, I don't think the dispatcher should crash in this event. It >> seems not exceptional that a job is misconfigured when submitted. >> >> Please direct me on the right path if I'm headed in the wrong direction. >> Also let me know if I should open some tickets for these issues. >> >> Thanks, >> - Alan >> >> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen wrote: >> >>> Yes you can create an issue, or actually contribute a patch to update it >>> :) >>> >>> Sorry the docs is a bit light, I'm going to make it more complete along >>> the way. >>> >>> Tim >>> >>> >>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) < >>> tomwa...@cisco.com> wrote: >>> Tim, Thank you for the explanation. You are correct, my Mesos experience is very light, and I haven’t deployed anything via Marathon yet. What you have stated here makes sense, I will look into doing this. Adding this info to the docs would be great. Is the appropriate action to create an issue regarding improvement of the docs? For those of us who are gaining the experience having such a pointer is very helpful. Tom From: Tim Chen Date: Thursday, September 10, 2015 at 10:25 AM To: Tom Waterhouse Cc: "user@spark.apache.org" Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation Hi Tom, Sorry the documentation isn't rea
Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi
yarn-client still runs the executor tasks on the cluster, the main difference is where the driver job runs. Thanks, Ewan -- Original message-- From: shahab Date: Fri, 18 Sep 2015 13:11 To: Aniket Bhatnagar; Cc: user@spark.apache.org; Subject:Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. It works using yarn-client but I want to make it running on cluster. Is there any way to do so? best, /Shahab On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar mailto:aniket.bhatna...@gmail.com>> wrote: Can you try yarn-client mode? On Fri, Sep 18, 2015, 3:38 PM shahab mailto:shahab.mok...@gmail.com>> wrote: Hi, Probably I have wrong zeppelin configuration, because I get the following error when I execute spark statements in Zeppelin: org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. Anyone knows What's the solution to this? best, /Shahab