Re: weightCol doesn't seem to be handled properly in PySpark
Yep, done. https://issues.apache.org/jira/browse/SPARK-17508 On Mon, Sep 12, 2016 at 9:06 AM Nick Pentreathwrote: > Could you create a JIRA ticket for it? > > https://issues.apache.org/jira/browse/SPARK > > On Thu, 8 Sep 2016 at 07:50 evanzamir wrote: > >> When I am trying to use LinearRegression, it seems that unless there is a >> column specified with weights, it will raise a py4j error. Seems odd >> because >> supposedly the default is weightCol=None, but when I specifically pass in >> weightCol=None to LinearRegression, I get this error. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/weightCol-doesn-t-seem-to-be-handled-properly-in-PySpark-tp27677.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: I noticed LinearRegression sometimes produces negative R^2 values
Yes, it's on a hold out segment from the data set being fitted. On Wed, Sep 7, 2016 at 1:02 AM Sean Owen <so...@cloudera.com> wrote: > Yes, should be. > It's also not necessarily nonnegative if you evaluate R^2 on a > different data set than you fit it to. Is that the case? > > On Tue, Sep 6, 2016 at 11:15 PM, Evan Zamir <zamir.e...@gmail.com> wrote: > > I am using the default setting for setting fitIntercept, which *should* > be > > TRUE right? > > > > On Tue, Sep 6, 2016 at 1:38 PM Sean Owen <so...@cloudera.com> wrote: > >> > >> Are you not fitting an intercept / regressing through the origin? with > >> that constraint it's no longer true that R^2 is necessarily > >> nonnegative. It basically means that the errors are even bigger than > >> what you'd get by predicting the data's mean value as a constant > >> model. > >> > >> On Tue, Sep 6, 2016 at 8:49 PM, evanzamir <zamir.e...@gmail.com> wrote: > >> > Am I misinterpreting what r2() in the LinearRegression Model summary > >> > means? > >> > By definition, R^2 should never be a negative number! > >> > > >> > > >> > > >> > -- > >> > View this message in context: > >> > > http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html > >> > Sent from the Apache Spark User List mailing list archive at > Nabble.com. > >> > > >> > - > >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >> > >
Re: I noticed LinearRegression sometimes produces negative R^2 values
I am using the default setting for setting *fitIntercept*, which *should* be TRUE right? On Tue, Sep 6, 2016 at 1:38 PM Sean Owenwrote: > Are you not fitting an intercept / regressing through the origin? with > that constraint it's no longer true that R^2 is necessarily > nonnegative. It basically means that the errors are even bigger than > what you'd get by predicting the data's mean value as a constant > model. > > On Tue, Sep 6, 2016 at 8:49 PM, evanzamir wrote: > > Am I misinterpreting what r2() in the LinearRegression Model summary > means? > > By definition, R^2 should never be a negative number! > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
[Community] Python support added to Spark Job Server
Hi folks, Just a friendly message that we have added Python support to the REST Spark Job Server project. If you are a Python user looking for a RESTful way to manage your Spark jobs, please come have a look at our project! https://github.com/spark-jobserver/spark-jobserver -Evan - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to add custom steps to Pipeline models?
Thanks, but I should have been more clear that I'm trying to do this in PySpark, not Scala. Using an example I found on SO, I was able to implement a Pipeline step in Python, but it seems it is more difficult (perhaps currently impossible) to make it persist to disk (I tried implementing _to_java method to no avail). Any ideas about that? On Sun, Aug 14, 2016 at 6:02 PM Jacek Laskowskiwrote: > Hi, > > It should just work if you followed the Transformer interface [1]. > When you have the transformers, creating a Pipeline is a matter of > setting them as additional stages (using Pipeline.setStages [2]). > > [1] > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala > [2] > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala#L107 > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Fri, Aug 12, 2016 at 9:19 AM, evanzamir wrote: > > I'm building an LDA Pipeline, currently with 4 steps, Tokenizer, > > StopWordsRemover, CountVectorizer, and LDA. I would like to add more > steps, > > for example, stemming and lemmatization, and also 1-gram and 2-grams > (which > > I believe is not supported by the default NGram class). Is there a way to > > add these steps? In sklearn, you can create classes with fit() and > > transform() methods, and that should be enough. Is that true in Spark ML > as > > well (or something similar)? > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-custom-steps-to-Pipeline-models-tp27522.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: Can we use spark inside a web service?
Andres, A couple points: 1) If you look at my post, you can see that you could use Spark for low-latency - many sub-second queries could be executed in under a second, with the right technology. It really depends on "real time" definition, but I believe low latency is definitely possible. 2) Akka-http over SparkContext - this is essentially what Spark Job Server does. (it uses Spray, whic is the predecessor to akka-http we will upgrade once Spark 2.0 is incorporated) 3) Someone else can probably talk about Ignite, but it is based on a distributed object cache. So you define your objects in Java, POJOs, annotate which ones you want indexed, upload your jars, then you can execute queries. It's a different use case than typical OLAP. There is some Spark integration, but then you would have the same bottlenecks going through Spark. On Fri, Mar 11, 2016 at 5:02 AM, Andrés Ivaldi <iaiva...@gmail.com> wrote: > nice discussion , I've a question about Web Service with Spark. > > What Could be the problem using Akka-http as web service (Like play does ) , > with one SparkContext created , and the queries over -http akka using only > the instance of that SparkContext , > > Also about Analytics , we are working on real- time Analytics and as Hemant > said Spark is not a solution for low latency queries. What about using > Ingite for that? > > > On Fri, Mar 11, 2016 at 6:52 AM, Hemant Bhanawat <hemant9...@gmail.com> > wrote: >> >> Spark-jobserver is an elegant product that builds concurrency on top of >> Spark. But, the current design of DAGScheduler prevents Spark to become a >> truly concurrent solution for low latency queries. DagScheduler will turn >> out to be a bottleneck for low latency queries. Sparrow project was an >> effort to make Spark more suitable for such scenarios but it never made it >> to the Spark codebase. If Spark has to become a highly concurrent solution, >> scheduling has to be distributed. >> >> Hemant Bhanawat >> www.snappydata.io >> >> On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote: >>> >>> great discussion, indeed. >>> >>> Mark Hamstra and i spoke offline just now. >>> >>> Below is a quick recap of our discussion on how they've achieved >>> acceptable performance from Spark on the user request/response path (@mark- >>> feel free to correct/comment). >>> >>> 1) there is a big difference in request/response latency between >>> submitting a full Spark Application (heavy weight) versus having a >>> long-running Spark Application (like Spark Job Server) that submits >>> lighter-weight Jobs using a shared SparkContext. mark is obviously using >>> the latter - a long-running Spark App. >>> >>> 2) there are some enhancements to Spark that are required to achieve >>> acceptable user request/response times. some links that Mark provided are >>> as follows: >>> >>> https://issues.apache.org/jira/browse/SPARK-11838 >>> https://github.com/apache/spark/pull/11036 >>> https://github.com/apache/spark/pull/11403 >>> https://issues.apache.org/jira/browse/SPARK-13523 >>> https://issues.apache.org/jira/browse/SPARK-13756 >>> >>> Essentially, a deeper level of caching at the shuffle file layer to >>> reduce compute and memory between queries. >>> >>> Note that Mark is running a slightly-modified version of stock Spark. >>> (He's mentioned this in prior posts, as well.) >>> >>> And I have to say that I'm, personally, seeing more and more >>> slightly-modified versions of Spark being deployed to production to >>> workaround outstanding PR's and Jiras. >>> >>> this may not be what people want to hear, but it's a trend that i'm >>> seeing lately as more and more customize Spark to their specific use cases. >>> >>> Anyway, thanks for the good discussion, everyone! This is why we have >>> these lists, right! :) >>> >>> >>> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> >>> wrote: >>>> >>>> One of the premises here is that if you can restrict your workload to >>>> fewer cores - which is easier with FiloDB and careful data modeling - >>>> you can make this work for much higher concurrency and lower latency >>>> than most typical Spark use cases. >>>> >>>> The reason why it typically does not work in production is that most >>>> people are using HDFS and files. These data sources are designed for >>>> running que
Re: Can we use spark inside a web service?
At least for simple queries, the DAGScheduler does not appear to be the bottleneck - since we are able to schedule 700 queries, and all the scheduling is probably done from the main application thread. However, I did have high hopes for Sparrow. What was the reason they decided not to include that? On Fri, Mar 11, 2016 at 1:52 AM, Hemant Bhanawat <hemant9...@gmail.com> wrote: > Spark-jobserver is an elegant product that builds concurrency on top of > Spark. But, the current design of DAGScheduler prevents Spark to become a > truly concurrent solution for low latency queries. DagScheduler will turn > out to be a bottleneck for low latency queries. Sparrow project was an > effort to make Spark more suitable for such scenarios but it never made it > to the Spark codebase. If Spark has to become a highly concurrent solution, > scheduling has to be distributed. > > Hemant Bhanawat > www.snappydata.io > > On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote: >> >> great discussion, indeed. >> >> Mark Hamstra and i spoke offline just now. >> >> Below is a quick recap of our discussion on how they've achieved >> acceptable performance from Spark on the user request/response path (@mark- >> feel free to correct/comment). >> >> 1) there is a big difference in request/response latency between >> submitting a full Spark Application (heavy weight) versus having a >> long-running Spark Application (like Spark Job Server) that submits >> lighter-weight Jobs using a shared SparkContext. mark is obviously using >> the latter - a long-running Spark App. >> >> 2) there are some enhancements to Spark that are required to achieve >> acceptable user request/response times. some links that Mark provided are >> as follows: >> >> https://issues.apache.org/jira/browse/SPARK-11838 >> https://github.com/apache/spark/pull/11036 >> https://github.com/apache/spark/pull/11403 >> https://issues.apache.org/jira/browse/SPARK-13523 >> https://issues.apache.org/jira/browse/SPARK-13756 >> >> Essentially, a deeper level of caching at the shuffle file layer to reduce >> compute and memory between queries. >> >> Note that Mark is running a slightly-modified version of stock Spark. >> (He's mentioned this in prior posts, as well.) >> >> And I have to say that I'm, personally, seeing more and more >> slightly-modified versions of Spark being deployed to production to >> workaround outstanding PR's and Jiras. >> >> this may not be what people want to hear, but it's a trend that i'm seeing >> lately as more and more customize Spark to their specific use cases. >> >> Anyway, thanks for the good discussion, everyone! This is why we have >> these lists, right! :) >> >> >> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> >> wrote: >>> >>> One of the premises here is that if you can restrict your workload to >>> fewer cores - which is easier with FiloDB and careful data modeling - >>> you can make this work for much higher concurrency and lower latency >>> than most typical Spark use cases. >>> >>> The reason why it typically does not work in production is that most >>> people are using HDFS and files. These data sources are designed for >>> running queries and workloads on all your cores across many workers, >>> and not for filtering your workload down to only one or two cores. >>> >>> There is actually nothing inherent in Spark that prevents people from >>> using it as an app server. However, the insistence on using it with >>> HDFS is what kills concurrency. This is why FiloDB is important. >>> >>> I agree there are more optimized stacks for running app servers, but >>> the choices that you mentioned: ES is targeted at text search; Cass >>> and HBase by themselves are not fast enough for analytical queries >>> that the OP wants; and MySQL is great but not scalable. Probably >>> something like VectorWise, HANA, Vertica would work well, but those >>> are mostly not free solutions. Druid could work too if the use case >>> is right. >>> >>> Anyways, great discussion! >>> >>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote: >>> > you are correct, mark. i misspoke. apologies for the confusion. >>> > >>> > so the problem is even worse given that a typical job requires multiple >>> > tasks/cores. >>> > >>> > i have yet to see this particular architecture w
Re: Can we use spark inside a web service?
One of the premises here is that if you can restrict your workload to fewer cores - which is easier with FiloDB and careful data modeling - you can make this work for much higher concurrency and lower latency than most typical Spark use cases. The reason why it typically does not work in production is that most people are using HDFS and files. These data sources are designed for running queries and workloads on all your cores across many workers, and not for filtering your workload down to only one or two cores. There is actually nothing inherent in Spark that prevents people from using it as an app server. However, the insistence on using it with HDFS is what kills concurrency. This is why FiloDB is important. I agree there are more optimized stacks for running app servers, but the choices that you mentioned: ES is targeted at text search; Cass and HBase by themselves are not fast enough for analytical queries that the OP wants; and MySQL is great but not scalable. Probably something like VectorWise, HANA, Vertica would work well, but those are mostly not free solutions. Druid could work too if the use case is right. Anyways, great discussion! On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote: > you are correct, mark. i misspoke. apologies for the confusion. > > so the problem is even worse given that a typical job requires multiple > tasks/cores. > > i have yet to see this particular architecture work in production. i would > love for someone to prove otherwise. > > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com> > wrote: >>> >>> For example, if you're looking to scale out to 1000 concurrent requests, >>> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >>> cores. >> >> >> This doesn't make sense. A Spark Job is a driver/DAGScheduler concept >> without any 1:1 correspondence between Worker cores and Jobs. Cores are >> used to run Tasks, not Jobs. So, yes, a 1000 core cluster can run at most >> 1000 simultaneous Tasks, but that doesn't really tell you anything about how >> many Jobs are or can be concurrently tracked by the DAGScheduler, which will >> be apportioning the Tasks from those concurrent Jobs across the available >> Executor cores. >> >> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly <ch...@fregly.com> wrote: >>> >>> Good stuff, Evan. Looks like this is utilizing the in-memory >>> capabilities of FiloDB which is pretty cool. looking forward to the webcast >>> as I don't know much about FiloDB. >>> >>> My personal thoughts here are to removed Spark from the user >>> request/response hot path. >>> >>> I can't tell you how many times i've had to unroll that architecture at >>> clients - and replace with a real database like Cassandra, ElasticSearch, >>> HBase, MySql. >>> >>> Unfortunately, Spark - and Spark Streaming, especially - lead you to >>> believe that Spark could be used as an application server. This is not a >>> good use case for Spark. >>> >>> Remember that every job that is launched by Spark requires 1 CPU core, >>> some memory, and an available Executor JVM to provide the CPU and memory. >>> >>> Yes, you can horizontally scale this because of the distributed nature of >>> Spark, however it is not an efficient scaling strategy. >>> >>> For example, if you're looking to scale out to 1000 concurrent requests, >>> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >>> cores. this is just not cost effective. >>> >>> Use Spark for what it's good for - ad-hoc, interactive, and iterative >>> (machine learning, graph) analytics. Use an application server for what >>> it's good - managing a large amount of concurrent requests. And use a >>> database for what it's good for - storing/retrieving data. >>> >>> And any serious production deployment will need failover, throttling, >>> back pressure, auto-scaling, and service discovery. >>> >>> While Spark supports these to varying levels of production-readiness, >>> Spark is a batch-oriented system and not meant to be put on the user >>> request/response hot path. >>> >>> For the failover, throttling, back pressure, autoscaling that i mentioned >>> above, it's worth checking out the suite of Netflix OSS - particularly >>> Hystrix, Eureka, Zuul, Karyon, etc: http://netflix.github.io/ >>> >>> Here's my github project that incorporates a lot of these: >>> https://github.com/cfr
Achieving 700 Spark SQL Queries Per Second
Hey folks, I just saw a recent thread on here (but can't find it anymore) on using Spark as a web-speed query engine. I want to let you guys know that this is definitely possible! Most folks don't realize how low-latency Spark can actually be. Please check out my blog post below on achieving 700 queries per second in Spark: http://velvia.github.io/Spark-Concurrent-Fast-Queries/ Would love your feedback. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Construct model matrix from SchemaRDD automatically
Hi Wush, I'm CC'ing user@spark.apache.org (which is the new list) and BCC'ing u...@spark.incubator.apache.org. In Spark 1.3, schemaRDD is in fact being renamed to DataFrame (see: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html ) As for a model.matrix, you might have a look at the new pipelines API in spark 1.2 (to be further improved in 1.3) which provides facilities for repeatable data transformation as input to ML algorithms. That said - something to handle the case of automatically one-hot encoding all the categorical variables in a DataFrame might be a welcome addition. - Evan On Thu, Mar 5, 2015 at 8:43 PM, Wush Wu w...@bridgewell.com wrote: Dear all, I am a new spark user from R. After exploring the schemaRDD, I notice that it is similar to data.frame. Is there a feature like `model.matrix` in R to convert schemaRDD to model matrix automatically according to the type without explicitly converting them one by one? Thanks, Wush
Re: Spark on teradata?
Have you taken a look at the TeradataDBInputFormat? Spark is compatible with arbitrary hadoop input formats - so this might work for you: http://developer.teradata.com/extensibility/articles/hadoop-mapreduce-connector-to-teradata-edw On Thu, Jan 8, 2015 at 10:53 AM, gen tang gen.tan...@gmail.com wrote: Thanks a lot for your reply. In fact, I need to work on almost all the data in teradata (~100T). So, I don't think that jdbcRDD is a good choice. Cheers Gen On Thu, Jan 8, 2015 at 7:39 PM, Reynold Xin r...@databricks.com wrote: Depending on your use cases. If the use case is to extract small amount of data out of teradata, then you can use the JdbcRDD and soon a jdbc input source based on the new Spark SQL external data source API. On Wed, Jan 7, 2015 at 7:14 AM, gen tang gen.tan...@gmail.com wrote: Hi, I have a stupid question: Is it possible to use spark on Teradata data warehouse, please? I read some news on internet which say yes. However, I didn't find any example about this issue Thanks in advance. Cheers Gen
Re: Spark and Stanford CoreNLP
Chris, Thanks for stopping by! Here's a simple example. Imagine I've got a corpus of data, which is an RDD[String], and I want to do some POS tagging on it. In naive spark, that might look like this: val props = new Properties.setAnnotators(pos) val proc = new StanfordCoreNLP(props) val data = sc.textFile(hdfs://some/distributed/corpus) def processData(s: String): Annotation = { val a = new Annotation(s) proc.annotate(a) } val processedData = data.map(processData) //Note that this is actually executed lazily. Under the covers, spark takes the closure (processData), serializes it and all objects/methods that it references (including the proc), and ships the serialized closure off to workers so that they can run it on their local partitions of the corpus. The issue at hand is that since the StanfordCoreNLP object isn't serializable, *this will fail at runtime.* Hence the solutions to this problem suggested in this thread, which all come down to initializing the processor on the worker side (preferably once). Your intuition about not wanting to serialize huge objects is fine. This issue is not unique to CoreNLP - any Java library which has non-serializable objects will face this issue. HTH, Evan On Tue, Nov 25, 2014 at 8:05 AM, Christopher Manning mann...@stanford.edu wrote: I’m not (yet!) an active Spark user, but saw this thread on twitter … and am involved with Stanford CoreNLP. Could someone explain how things need to be to work better with Spark — since that would be a useful goal. That is, while Stanford CoreNLP is not quite uniform (being developed by various people for over a decade), the general approach has always been that models should be serializable but that processors should not be. This make sense to me intuitively. It doesn’t really make sense to serialize a processor, which often has large mutable data structures used for processing. But does that not work well with Spark? Do processors need to be serializable, and then one needs to go through and make all the elements of the processor transient? Or what? Thanks! Chris On Nov 25, 2014, at 7:54 AM, Evan Sparks evan.spa...@gmail.com wrote: If you only mark it as transient, then the object won't be serialized, and on the worker the field will be null. When the worker goes to use it, you get an NPE. Marking it lazy defers initialization to first use. If that use happens to be after serialization time (e.g. on the worker), then the worker will first check to see if it's initialized, and then initialize it if not. I think if you *do* reference the lazy val before serializing you will likely get an NPE. On Nov 25, 2014, at 1:05 AM, Theodore Vasiloudis theodoros.vasilou...@gmail.com wrote: Great, Ian's approach seems to work fine. Can anyone provide an explanation as to why this works, but passing the CoreNLP object itself as transient does not? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654p19739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
This is probably not the right venue for general questions on CoreNLP - the project website (http://nlp.stanford.edu/software/corenlp.shtml) provides documentation and links to mailing lists/stack overflow topics. On Mon, Nov 24, 2014 at 9:08 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hello, I'm new to Stanford CoreNLP. Could any one share good training material and examples(java or scala) on NLP. Regards, Rajesh On Mon, Nov 24, 2014 at 9:38 PM, Ian O'Connell i...@ianoconnell.com wrote: object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
Additionally - I strongly recommend using OpenBLAS over the Atlas build from the default Ubuntu repositories. Alternatively, you can build ATLAS on the hardware you're actually going to be running the matrix ops on (the master/workers), but we've seen modest performance gains doing this vs. OpenBLAS, at least on the bigger EC2 machines (e.g. cc2.8xlarge, c3.8xlarge). On Mon, Nov 24, 2014 at 11:26 AM, Xiangrui Meng men...@gmail.com wrote: Try building Spark with -Pnetlib-lgpl, which includes the JNI library in the Spark assembly jar. This is the simplest approach. If you want to include it as part of your project, make sure the library is inside the assembly jar or you specify it via `--jars` with spark-submit. -Xiangrui On Mon, Nov 24, 2014 at 8:51 AM, agg212 alexander_galaka...@brown.edu wrote: Hi, i'm trying to improve performance for Spark's Mllib, and I am having trouble getting native netlib-java libraries installed/recognized by Spark. I am running on a single machine, Ubuntu 14.04 and here is what I've tried: sudo apt-get install libgfortran3 sudo apt-get install libatlas3-base libopenblas-base (this is how netlib-java's website says to install it) I also double checked and it looks like the libraries are linked correctly in /usr/lib (see below): /usr/lib/libblas.so.3 - /etc/alternatives/libblas.so.3 /usr/lib/liblapack.so.3 - /etc/alternatives/liblapack.so.3 The Dependencies section on Spark's Mllib website also says to include com.github.fommil.netlib:all:1.1.2 as a dependency. I therefore tried adding this to my sbt file like so: libraryDependencies += com.github.fommil.netlib % all % 1.1.2 After all this, i'm still seeing the following error message. Does anyone have more detailed installation instructions? 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
Neat hack! This is cute and actually seems to work. The fact that it works is a little surprising and somewhat unintuitive. On Mon, Nov 24, 2014 at 8:08 AM, Ian O'Connell i...@ianoconnell.com wrote: object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
You can try recompiling spark with that option, and doing an sbt/sbt publish-local, then change your spark version from 1.1.0 to 1.2.0-SNAPSHOT (assuming you're building from the 1.1 branch) - sbt or maven (whichever you're compiling your app with) will pick up the version of spark that you just built. On Mon, Nov 24, 2014 at 6:31 PM, agg212 alexander_galaka...@brown.edu wrote: I am running it in local. How can I use the built version (in local mode) so that I can use the native libraries? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p19705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL - can we add new column(s) to parquet files
I would expect an SQL query on c would fail because c would not be known in the schema of the older Parquet file. What I'd be very interested in is how to add a new column as an incremental new parquet file, and be able to somehow join the existing and new file, in an efficient way. IE, somehow guarantee that for every row in the old parquet file, that the corresponding rows in the new file would be stored in the same node, so that joins are local. On Fri, Nov 21, 2014 at 10:03 AM, Sadhan Sood sadhan.s...@gmail.com wrote: We create the table definition by reading the parquet file for schema and store it in hive metastore. But if someone adds a new column to the schema, and if we rescan the schema from the new parquet files and update the table definition, would it still work if we run queries on the table ? So, old table has - Int a, Int b new table - Int a, Int b, String c but older parquet files don't have String c, so on querying the table would it return me null for column c from older files and data from newer files or fail?
Re: Best practice for multi-user web controller in front of Spark
For sharing RDDs across multiple jobs - you could also have a look at Tachyon. It provides an HDFS compatible in-memory storage layer that keeps data in memory across multiple jobs/frameworks - http://tachyon-project.org/ . - On Tue, Nov 11, 2014 at 8:11 AM, Sonal Goyal sonalgoy...@gmail.com wrote: I believe the Spark Job Server by Ooyala can help you share data across multiple jobs, take a look at http://engineering.ooyala.com/blog/open-sourcing-our-spark-job-server. It seems to fit closely to what you need. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Tue, Nov 11, 2014 at 7:20 PM, bethesda swearinge...@mac.com wrote: We are relatively new to spark and so far have been manually submitting single jobs at a time for ML training, during our development process, using spark-submit. Each job accepts a small user-submitted data set and compares it to every data set in our hdfs corpus, which only changes incrementally on a daily basis. (that detail is relevant to question 3 below) Now we are ready to start building out the front-end, which will allow a team of data scientists to submit their problems to the system via a web front-end (web tier will be java). Users could of course be submitting jobs more or less simultaneously. We want to make sure we understand how to best structure this. Questions: 1 - Does a new SparkContext get created in the web tier for each new request for processing? 2 - If so, how much time should we expect it to take for setting up the context? Our goal is to return a response to the users in under 10 seconds, but if it takes many seconds to create a new context or otherwise set up the job, then we need to adjust our expectations for what is possible. From using spark-shell one might conclude that it might take more than 10 seconds to create a context, however it's not clear how much of that is context-creation vs other things. 3 - (This last question perhaps deserves a post in and of itself:) if every job is always comparing some little data structure to the same HDFS corpus of data, what is the best pattern to use to cache the RDD's from HDFS so they don't have to always be re-constituted from disk? I.e. how can RDD's be shared from the context of one job to the context of subsequent jobs? Or does something like memcache have to be used? Thanks! David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-multi-user-web-controller-in-front-of-Spark-tp18581.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
There are a few examples where this is the case. Let's take ALS, where the result is a MatrixFactorizationModel, which is assumed to be big - the model consists of two matrices, one (users x k) and one (k x products). These are represented as RDDs. You can save these RDDs out to disk by doing something like model.userFeatures.saveAsObjectFile(...) and model.productFeatures.saveAsObjectFile(...) to save out to HDFS or Tachyon or S3. Then, when you want to reload you'd have to instantiate them into a class of MatrixFactorizationModel. That class is package private to MLlib right now, so you'd need to copy the logic over to a new class, but that's the basic idea. That said - using spark to serve these recommendations on a point-by-point basis might not be optimal. There's some work going on in the AMPLab to address this issue. On Fri, Nov 7, 2014 at 7:44 AM, Duy Huynh duy.huynh@gmail.com wrote: you're right, serialization works. what is your suggestion on saving a distributed model? so part of the model is in one cluster, and some other parts of the model are in other clusters. during runtime, these sub-models run independently in their own clusters (load, train, save). and at some point during run time these sub-models merge into the master model, which also loads, trains, and saves at the master level. much appreciated. On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks evan.spa...@gmail.com wrote: There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why decision trees do binary split?
You can imagine this same logic applying to the continuous case. E.g. what if all the quartiles or deciles of a particular value have different behavior - this could capture that too. Of what if some combination of features was highly discriminitive but only into n buckets, rather than two.. you can see there are lots of different options here. In general in MLlib, we're trying to support widely accepted and frequently used ML models, and simply offer a platform to efficiently train these with spark. While decision trees with n-ary splits might be a sensible thing to explore, they are not widely used in practice, and I'd want to see some compelling results from proper ML/stats researchers before shipping them as a default feature. If you're looking for a way to control variance and pick up nuance in your dataset that's not covered by plain decision trees, I recommend looking at Random Forests - a well studied extension to decision trees that's also widely used in practice - and coming to MLlib soon! On Thu, Nov 6, 2014 at 3:29 AM, Tamas Jambor jambo...@gmail.com wrote: Thanks for the reply, Sean. I can see that splitting on all the categories would probably overfit the tree, on the other hand, it might give more insight on the subcategories (probably only would work if the data is uniformly distributed between the categories). I haven't really found any comparison between the two methods in terms of performance and interpretability. thanks, On Thu, Nov 6, 2014 at 9:46 AM, Sean Owen so...@cloudera.com wrote: I haven't seen that done before, which may be most of the reason - I am not sure that is common practice. I can see upsides - you need not pick candidate splits to test since there is only one N-way rule possible. The binary split equivalent is N levels instead of 1. The big problem is that you are always segregating the data set entirely, and making the equivalent of those N binary rules, even when you would not otherwise bother because they don't add information about the target. The subsets matching each child are therefore unnecessarily small and this makes learning on each independent subset weaker. On Nov 6, 2014 9:36 AM, jamborta jambo...@gmail.com wrote: I meant above, that in the case of categorical variables it might be more efficient to create a node on each categorical value. Is there a reason why spark went down the binary route? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188p18265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multitenancy in Spark - within/across spark context
Ashwin, I would say the strategies in general are: 1) Have each user submit separate Spark app (each its own Spark Context), with its own resource settings, and share data through HDFS or something like Tachyon for speed. 2) Share a single spark context amongst multiple users, using fair scheduler. This is sort of like having a Hadoop resource pool.It has some obvious HA/SPOF issues, namely that if the context dies then every user using it is also dead. Also, sharing RDDs in cached memory has the same resiliency problems, namely that if any executor dies then Spark must recompute / rebuild the RDD (it tries to only rebuild the missing part, but sometimes it must rebuild everything). Job server can help with 1 or 2, 2 in particular. If you have any questions about job server, feel free to ask at the spark-jobserver google group. I am the maintainer. -Evan On Thu, Oct 23, 2014 at 1:06 PM, Marcelo Vanzin van...@cloudera.com wrote: You may want to take a look at https://issues.apache.org/jira/browse/SPARK-3174. On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib linking error Mac OS X
MLlib relies on breeze for much of its linear algebra, which in turn relies on netlib-java. netlib-java will attempt to load a native BLAS at runtime and then attempt to load it's own precompiled version. Failing that, it will default back to a Java version that it has built in. The Java version can be about as fast as a native version for certain operations that are tricky to optimize (like vector dot products), but MUCH slower for things like matrix/matrix multiply. Luckily - the code will still work without the native libraries installed, it will just be slower in some situations. So, you can safely ignore the warnings if all you care about is correctness. The MLlib docs (https://spark.apache.org/docs/latest/mllib-guide.html) provide guidance about how to link against the native libraries in your application - this will make the warning messages go away and might speed up your program. - Evan On Oct 20, 2014, at 3:54 AM, npomfret nick-nab...@snowmonkey.co.uk wrote: I'm getting the same warning on my mac. Accompanied by what appears to be pretty low CPU usage (http://apache-spark-user-list.1001560.n3.nabble.com/mlib-model-build-and-low-CPU-usage-td16777.html), I wonder if they are connected? I've used jblas on a mac several times, it always just works perfectly with zero setup. Maybe the warning is misleading. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-linking-error-Mac-OS-X-tp588p16806.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark speed performance
How many files do you have and how big is each JSON object? Spark works better with a few big files vs many smaller ones. So you could try cat'ing your files together and rerunning the same experiment. - Evan On Oct 18, 2014, at 12:07 PM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
It's true that it is an implementation detail, but it's a very important one to document because it has the possibility of changing results depending on when I use take or collect. The issue I was running in to was when the executor had a different operating system than the driver, and I was using 'pipe' with a binary I compiled myself. I needed to make sure I used the binary compiled for the operating system I expect it to run on. So in cases where I was only interested in the first value, my code was breaking horribly on 1.0.2, but working fine on 1.1. My only suggestion would be to backport 'spark.localExecution.enabled' to the 1.0 line. Thanks for all your help! Evan On Fri, Oct 10, 2014 at 10:40 PM, Davies Liu dav...@databricks.com wrote: This is some kind of implementation details, so not documented :-( If you think this is a blocker for you, you could create a JIRA, maybe it's could be fixed in 1.0.3+. Davies On Fri, Oct 10, 2014 at 5:11 PM, Evan evan.sama...@gmail.com wrote: Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: where are my python lambda functions run in yarn-client mode?
Thank you! I was looking for a config variable to that end, but I was looking in Spark 1.0.2 documentation, since that was the version I had the problem with. Is this behavior documented in 1.0.2's documentation? Evan On 10/09/2014 04:12 PM, Davies Liu wrote: When you call rdd.take() or rdd.first(), it may[1] executor the job locally (in driver), otherwise, all the jobs are executed in cluster. There is config called `spark.localExecution.enabled` (since 1.1+) to change this, it's not enabled by default, so all the functions will be executed in cluster. If you change set this to `true`, then you get the same behavior as 1.0. [1] If it did not get enough items from the first partitions, it will try multiple partitions in a time, so they will be executed in cluster. On Thu, Oct 9, 2014 at 12:14 PM, esamanas evan.sama...@gmail.com wrote: Hi, I am using pyspark and I'm trying to support both Spark 1.0.2 and 1.1.0 with my app, which will run in yarn-client mode. However, it appears when I use 'map' to run a python lambda function over an RDD, they appear to be run on different machines, and this is causing problems. In both cases, I am using a Hadoop cluster that runs linux on all of its nodes. I am submitting my jobs with a machine running Mac OS X 10.9. As a reproducer, here is my script: import platform print sc.parallelize([1]).map(lambda x: platform.system()).take(1)[0] The answer in Spark 1.1.0: 'Linux' The answer in Spark 1.0.2: 'Darwin' In other experiments I changed the size of the list that gets parallelized, thinking maybe 1.0.2 just runs jobs on the driver node if they're small enough. I got the same answer (with only 1 million numbers). This is a troubling difference. I would expect all functions run on an RDD to be executed on my worker nodes in the Hadoop cluster, but this is clearly not the case for 1.0.2. Why does this difference exist? How can I accurately detect which jobs will run where? Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/where-are-my-python-lambda-functions-run-in-yarn-client-mode-tp16059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to run kmeans after pca?
Caching after doing the multiply is a good idea. Keep in mind that during the first iteration of KMeans, the cached rows haven't yet been materialized - so it is both doing the multiply and the first pass of KMeans all at once. To isolate which part is slow you can run cachedRows.numRows() to force this to be materialized before you run KMeans. Also, KMeans is optimized to run quickly on both sparse and dense data. The result of PCA is going to be dense, but if your input data has #nnz ~= size(pca data), performance might be about the same. (I haven't actually verified this last point.) Finally, speed is partially going to be dependent on how much data you have relative to scheduler overheads - if your input data is small it could be that the costs of distributing your task are greater than the time spent actually computing - usually this would manifest itself in the stages taking about the same amount of time even though you're passing datasets that have different dimensionality. On Tue, Sep 30, 2014 at 9:00 AM, st553 sthompson...@gmail.com wrote: Thanks for your response Burak it was very helpful. I am noticing that if I run PCA before KMeans that the KMeans algorithm will actually take longer to run than if I had just run KMeans without PCA. I was hoping that by using PCA first it would actually speed up the KMeans algorithm. I have followed the steps you've outlined but Im wondering if I need to cache/persist the RDD[Vector] rows of the RowMatrix returned after multiplying. Something like: val newData: RowMatrix = data.multiply(bcPrincipalComponents.value) val cachedRows = newData.rows.persist() KMeans.run(cachedRows) cachedRows.unpersist() It doesnt seem intuitive to me that a smaller dimensional version of my data set would take longer for KMeans... unless Im missing something? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-kmeans-after-pca-tp14473p15409.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark1.0 principal component analysis
In its current implementation, the principal components are computed in MLlib in two steps: 1) In a distributed fashion, compute the covariance matrix - the result is a local matrix. 2) On this local matrix, compute the SVD. The sorting comes from the SVD. If you want to get the eigenvalues out, you can simply run step 1 yourself on your RowMatrix via the (experimental) computeCovariance() method, and then run SVD on the result using a library like breeze. - Evan On Tue, Sep 23, 2014 at 12:49 PM, st553 sthompson...@gmail.com wrote: sowen wrote it seems that the singular values from the SVD aren't returned, so I don't know that you can access this directly Its not clear to me why these aren't returned? The S matrix would be useful to determine a reasonable value for K. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-principal-component-analysis-tp9249p14919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Case of mutable RDD - any ideas around will help.
Sweet, that's probably it. Too bad it didn't seem to make 1.1? On Wed, Sep 17, 2014 at 5:32 PM, Michael Armbrust mich...@databricks.com wrote: The unknown slowdown might be addressed by https://github.com/apache/spark/commit/f858f466862541c3faad76a1fa2391f1c17ec9dd On Sun, Sep 14, 2014 at 10:40 PM, Evan Chan velvia.git...@gmail.com wrote: SPARK-1671 looks really promising. Note that even right now, you don't need to un-cache the existing table. You can do something like this: newAdditionRdd.registerTempTable(table2) sqlContext.cacheTable(table2) val unionedRdd = sqlContext.table(table1).unionAll(sqlContext.table(table2)) When you use table, it will return you the cached representation, so that the union executes much faster. However, there is some unknown slowdown, it's not quite as fast as what you would expect. On Fri, Sep 12, 2014 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Ah, I see. So basically what you need is something like cache write through support which exists in Shark but not implemented in Spark SQL yet. In Shark, when inserting data into a table that has already been cached, the newly inserted data will be automatically cached and “union”-ed with the existing table content. SPARK-1671 was created to track this feature. We’ll work on that. Currently, as a workaround, instead of doing union at the RDD level, you may try cache the new table, union it with the old table and then query the union-ed table. The drawbacks is higher code complexity and you end up with lots of temporary tables. But the performance should be reasonable. On Fri, Sep 12, 2014 at 1:19 PM, Archit Thakur archit279tha...@gmail.com wrote: LittleCode snippet: line1: cacheTable(existingRDDTableName) line2: //some operations which will materialize existingRDD dataset. line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName) line4: cacheTable(new_existingRDDTableName) line5: //some operation that will materialize new _existingRDD. now, what we expect is in line4 rather than caching both existingRDDTableName and new_existingRDDTableName, it should cache only new_existingRDDTableName. but we cannot explicitly uncache existingRDDTableName because we want the union to use the cached existingRDDTableName. since being lazy new_existingRDDTableName could be materialized later and by then we cant lose existingRDDTableName from cache. What if keep the same name of the new table so, cacheTable(existingRDDTableName) existingRDD.union(newRDD).registerAsTable(existingRDDTableName) cacheTable(existingRDDTableName) //might not be needed again. Will our both cases be satisfied, that it uses existingRDDTableName from cache for union and dont duplicate the data in the cache but somehow, append to the older cacheTable. Thanks and Regards, Archit Thakur. Sr Software Developer, Guavus, Inc. On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora pankajarora.n...@gmail.com wrote: I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Example of Geoprocessing with Spark
Hi Abel, Pretty interesting. May I ask how big is your point CSV dataset? It seems you are relying on searching through the FeatureCollection of polygons for which one intersects your point. This is going to be extremely slow. I highly recommend using a SpatialIndex, such as the many that exist in the JTS library itself, to speed things up. Also, note that the geoscript library is not really maintained anymore. I forked it with the intention of maintaining it some more, but I've decided this is not really a good direction. On Thu, Sep 18, 2014 at 7:02 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Now i have a better version, but now the problem is that the saveAsTextFile do not finish the Job, in the hdfs repository only exist a partial temporary file, someone can tell me what is wrong: Thanks !! object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName(Csv Clipper) val sc = new SparkContext(conf) val csvPath = hdfs://m01/user/acoronado/mov/movilidad_64mb.csv val csv = sc.textFile(csvPath) csv.cache() val clipPoints = csv.map({line: String = val Array(usuario, lat, lon, date) = line.split(,).map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val internal = geoDataExternal.get.find(f = f.geometry intersects punto) val (cve_est, cve_mun) = internal match { case Some(f:org.geoscript.feature.Feature) = { val index = f.getAttribute(1).toString() val existe = geoDataMun.get(index).find(f = f.geometry intersects punto) existe match { case Some(f) = (f.getAttribute(1).toString, f.getAttribute(2).toString) case None = (0, 0) } } case None = (0, 0) } val time = try {(new SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$, +)).getTime().toString()} catch {case e: Exception = 0} line+,+time+,+cve_est+,+cve_mun }) clipPoints.saveAsTextFile(hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv) println(Spark Clip Exito!!!) } object geoDataMun { private val shp = Shapefile(/geoData/MunicipiosLatLon.shp) val features = shp.getFeatures.toIterable val result = scala.io.Source.fromFile(/geoData/indice_espacial.csv) .getLines() .toList map { line: String = val campos = line.split(,).map(_.trim) val cve_edo = campos(0) val cve_mun = campos(1) val index = campos(2) scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun)) } val mapaIx = result.groupBy(x=x(0)).mapValues(cves = cves.map(x = x(1))) def get(index:String) = { features.filter(f = mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString))) } } object geoDataExternal{ private val shp = Shapefile(/geoData/IndiceRecortado.shp) val features = shp.getFeatures def get: FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = features } } the log of the driver is: 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 ] 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
Re: Better way to process large image data set ?
What Sean said. You should also definitely turn on Kryo serialization. The default Java serialization is really really slow if you're gonna move around lots of data.Also make sure you use a cluster with high network bandwidth on. On Thu, Sep 18, 2014 at 3:06 AM, Sean Owen so...@cloudera.com wrote: Base 64 is an inefficient encoding for binary data by about 2.6x. You could use byte[] directly. But you would still be storing and potentially shuffling lots of data in your RDDs. If the files exist separately on HDFS perhaps you can just send around the file location and load it directly using HDFS APIs in the function that needs it. On Sep 18, 2014 9:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to process a large image data set and need some way to optimize my implementation since it's very slow from now. In my current implementation I store my images in an object file with the following fields case class Image(groupId: String, imageId: String, buffer: String) Images belong to groups and have an id, the buffer is the image file (jpg, png) encode in base 64 string. Before running an image processing algorithm on the image buffer, I have a lot of jobs that filter, group, join images in my data set based on groupId or imageId and theses steps are relatively slow. I suspect that spark moves around my image buffer even if it's not necessary for these specific jobs and then there's a lot of communication times waste. Is there a better way to optimize my implementation ? Regards, Jaonary - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Case of mutable RDD - any ideas around will help.
SPARK-1671 looks really promising. Note that even right now, you don't need to un-cache the existing table. You can do something like this: newAdditionRdd.registerTempTable(table2) sqlContext.cacheTable(table2) val unionedRdd = sqlContext.table(table1).unionAll(sqlContext.table(table2)) When you use table, it will return you the cached representation, so that the union executes much faster. However, there is some unknown slowdown, it's not quite as fast as what you would expect. On Fri, Sep 12, 2014 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Ah, I see. So basically what you need is something like cache write through support which exists in Shark but not implemented in Spark SQL yet. In Shark, when inserting data into a table that has already been cached, the newly inserted data will be automatically cached and “union”-ed with the existing table content. SPARK-1671 was created to track this feature. We’ll work on that. Currently, as a workaround, instead of doing union at the RDD level, you may try cache the new table, union it with the old table and then query the union-ed table. The drawbacks is higher code complexity and you end up with lots of temporary tables. But the performance should be reasonable. On Fri, Sep 12, 2014 at 1:19 PM, Archit Thakur archit279tha...@gmail.com wrote: LittleCode snippet: line1: cacheTable(existingRDDTableName) line2: //some operations which will materialize existingRDD dataset. line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName) line4: cacheTable(new_existingRDDTableName) line5: //some operation that will materialize new _existingRDD. now, what we expect is in line4 rather than caching both existingRDDTableName and new_existingRDDTableName, it should cache only new_existingRDDTableName. but we cannot explicitly uncache existingRDDTableName because we want the union to use the cached existingRDDTableName. since being lazy new_existingRDDTableName could be materialized later and by then we cant lose existingRDDTableName from cache. What if keep the same name of the new table so, cacheTable(existingRDDTableName) existingRDD.union(newRDD).registerAsTable(existingRDDTableName) cacheTable(existingRDDTableName) //might not be needed again. Will our both cases be satisfied, that it uses existingRDDTableName from cache for union and dont duplicate the data in the cache but somehow, append to the older cacheTable. Thanks and Regards, Archit Thakur. Sr Software Developer, Guavus, Inc. On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora pankajarora.n...@gmail.com wrote: I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Message Passing among workers
Asynchrony is not supported directly - spark's programming model is naturally BSP. I have seen cases where people have instantiated actors with akka on worker nodes to enable message passing, or even used spark's own ActorSystem to do this. But, I do not recommend this, since you lose a bunch of benefits of spark - e.g. fault tolerance. Instead, I would think about whether your algorithm can be cast as a BSP one, or think about how frequently you really need to synchronize state among your workers. It may be that having the occasional synchronization barrier is OK. On Wed, Sep 3, 2014 at 7:28 AM, laxmanvemula laxman8...@gmail.com wrote: Hi, I would like to implement an asynchronous distributed optimization algorithm where workers communicate among one another. It is similar to belief propagation where each worker is a vertex in the graph. Can some one let me know if this is possible using spark? Thanks, Laxman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-Passing-among-workers-tp13355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
I spoke with SK offline about this, it looks like the difference in timings came from the fact that he was training 100 models for 100 iterations and taking the total time (vs. my example which trains a single model for 100 iterations). I'm posting my response here, though, because I think it's worth documenting: Benchmarking on a dataset this small on this many cores is probably not going to give you any meaningful information about how the algorithms scale to real data problems. In this case, you've thrown 200 cores at 5.6kb of data - 200 low-dimensional data points. The overheads of scheduling tasks, sending them out to each worker, and network latencies between the nodes, which are essentially fixed regardless of problem size are COMPLETELY dominating the time spent computing - which in the first two cases is 9-10 flops per data point and in the last case is a couple of array lookups and adds per data point. It would make a lot more sense to find or generate a dataset that's 10 or 100GB and see how performance scales there. You can do this with the code I pasted earlier, just change the second, third, and fourth arguments to an appropriate number of elements, dimensionality, and number of partitions that matches the number of cores you have on your cluster. In short, don't use a cluster unless you need one :). Hope this helps! On Tue, Sep 2, 2014 at 3:51 PM, SK skrishna...@gmail.com wrote: The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
How many iterations are you running? Can you provide the exact details about the size of the dataset? (how many data points, how many features) Is this sparse or dense - and for the sparse case, how many non-zeroes? How many partitions is your data RDD? For very small datasets the scheduling overheads of shipping tasks across the cluster and delays due to stragglers can dominate the time actually doing your parallel computation. If you have too few partitions, you won't be taking advantage of cluster parallelism, and if you have too many you're introducing even more of the aforementioned overheads. On Tue, Sep 2, 2014 at 11:24 AM, SK skrishna...@gmail.com wrote: Hi, I evaluated the runtime performance of some of the MLlib classification algorithms on a local machine and a cluster with 10 nodes. I used standalone mode and Spark 1.0.1 in both cases. Here are the results for the total runtime: Local Cluster Logistic regression 138 sec 336 sec SVM 138 sec 336 sec Decision tree 50 sec 132 sec My dataset is quite small and my programs are very similar to the mllib examples that are included in the Spark distribution. Why is the runtime on the cluster significantly higher (almost 3 times) than that on the local machine even though the former uses more memory and more nodes? Is it because of the communication overhead on the cluster? I would like to know if there is something I need to be doing to optimize the performance on the cluster or if others have also been getting similar results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
Hmm... something is fishy here. That's a *really* small dataset for a spark job, so almost all your time will be spent in these overheads, but still you should be able to train a logistic regression model with the default options and 100 iterations in 1s on a single machine. Are you caching your dataset before training the classifier on it? It's possible that you're rereading it from disk (or across the internet, maybe) on every iteration? From spark-shell: import org.apache.spark.mllib.util.LogisticRegressionDataGenerator val dat = LogisticRegressionDataGenerator.generateLogisticRDD(sc, 200, 3, 1e-4, 4, 0.2).cache() println(dat.count()) //should give 200 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD val start = System.currentTimeMillis; val model = LogisticRegressionWithSGD.train(dat, 100); val delta = System.currentTimeMillis - start; println(delta) //On my laptop, 863ms. On Tue, Sep 2, 2014 at 3:51 PM, SK skrishna...@gmail.com wrote: The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding previous and next element in a sorted RDD
There's no way to avoid a shuffle due to the first and last elements of each partition needing to be computed with the others, but I wonder if there is a way to do a minimal shuffle. On Thu, Aug 21, 2014 at 6:13 PM, cjwang c...@cjwang.us wrote: One way is to do zipWithIndex on the RDD. Then use the index as a key. Add or subtract 1 for previous or next element. Then use cogroup or join to bind them together. val idx = input.zipWithIndex val previous = idx.map(x = (x._2+1, x._1)) val current = idx.map(x = (x._2, x._1)) val next = idx.map(x = (x._2-1, x._1)) val joined = current leftOuterJoin previous leftOuterJoin next Code looks clean to me, but I feel uneasy about the performance of join. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Merging two Spark SQL tables?
Is it possible to merge two cached Spark SQL tables into a single table so it can queried with one SQL statement? ie, can you do schemaRdd1.union(schemaRdd2), then register the new schemaRdd and run a query over it? Ideally, both schemaRdd1 and schemaRdd2 would be cached, so the union should run cached too. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging two Spark SQL tables?
SO I tried the above (why doesn't union or ++ have the same behavior btw?) and it works, but is slow because the original Rdds are not cached and files must be read from disk. I also discovered you can recover the InMemoryCached versions of the Rdds using sqlContext.table(table1). Thus you can do sqlContext.table(table1).unionAll(sqlContext.table(table2)), but this is like 10x slower than running the query on table1 which is cached using sqlContext.cacheTable(). (at least on Spark 1.0.2, haven't tried on 1.1.0 snapshot yet) On Thu, Aug 21, 2014 at 12:17 AM, Michael Armbrust mich...@databricks.com wrote: I believe this should work if you run srdd1.unionAll(srdd2). Both RDDs must have the same schema. On Wed, Aug 20, 2014 at 11:30 PM, Evan Chan velvia.git...@gmail.com wrote: Is it possible to merge two cached Spark SQL tables into a single table so it can queried with one SQL statement? ie, can you do schemaRdd1.union(schemaRdd2), then register the new schemaRdd and run a query over it? Ideally, both schemaRdd1 and schemaRdd2 would be cached, so the union should run cached too. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writeup on Spark SQL with GDELT
I just put up a repo with a write-up on how to import the GDELT public dataset into Spark SQL and play around. Has a lot of notes on different import methods and observations about Spark SQL. Feel free to have a look and comment. http://www.github.com/velvia/spark-sql-gdelt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Tachyon] Error reading from Parquet files in HDFS
Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Tachyon] Error reading from Parquet files in HDFS
The underFS is HDFS btw. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Tachyon] Error reading from Parquet files in HDFS
And it worked earlier with non-parquet directory. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: The underFS is HDFS btw. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-JobServer moving to a new location
Dear community, Wow, I remember when we first open sourced the job server, at the first Spark Summit in December. Since then, more and more of you have started using it and contributing to it. It is awesome to see! If you are not familiar with the spark job server, it is a REST API for managing your Spark jobs and job history and status. In order to make sure the project can continue to move forward independently, new features developed and contributions merged, we are moving the project to a new github organization. The new location is: https://github.com/spark-jobserver/spark-jobserver The git commit history is still there, but unfortunately the pull requests don't migrate over. I'll be contacting each of you with open PRs to move them over to the new location. Happy Hacking! Evan (@velvia) Kelvin (@kelvinchu) Daniel (@dan-null) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: type issue: found RDD[T] expected RDD[A]
That might not be enough. Reflection is used to determine what the fields are, thus your class might actually need to have members corresponding to the fields in the table. I heard that a more generic method of inputting stuff is coming. On Tue, Aug 19, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reduceByKey to get all associated values
Specifically, reduceByKey expects a commutative/associative reduce operation, and will automatically do this locally before a shuffle, which means it acts like a combiner in MapReduce terms - http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions On Thu, Aug 7, 2014 at 8:15 AM, Cheng Lian lian.cs@gmail.com wrote: The point is that in many cases the operation passed to reduceByKey aggregates data into much smaller size, say + and * for integer. String concatenation doesn’t actually “shrink” data, thus in your case, rdd.reduceByKey(_ ++ _) and rdd.groupByKey suffer similar performance issue. In general, don’t do these unless you have to. And in Konstantin’s case, I guess he knows what he’s doing. At least we can’t know whether we can help to optimize without further information about the business logic” is provided. On Aug 7, 2014, at 10:22 PM, chutium teng@gmail.com wrote: a long time ago, in Spark Summit 2013, Patrick Wendell said in his talk about performance ( http://spark-summit.org/talk/wendell-understanding-the-performance-of-spark-applications/ ) that, reduceByKey will be more efficient than groupByKey... he mentioned groupByKey copies all data over network. is that still true? which one should we choice? because actually we can replace all of groupByKey with reduceByKey for example, if we want to use groupByKey on a RDD[ String, String ], to get a RDD[ String, Seq[String] ], we can also do it with reduceByKey: at first, map RDD[ String, String ] to RDD[ String, Seq[String] ] then, reduceByKey(_ ++ _) on this RDD[ String, Seq[String] ] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-to-get-all-associated-values-tp11645p11652.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How can I implement eigenvalue decomposition in Spark?
Reza Zadeh has contributed the distributed implementation of (Tall/Skinny) SVD (http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html), which is in MLlib (Spark 1.0) and a distributed sparse SVD coming in Spark 1.1. (https://issues.apache.org/jira/browse/SPARK-1782). If your data is sparse (which it often is in social networks), you may have better luck with this. I haven't tried the GraphX implementation, but those algorithms are often well-suited for power-law distributed graphs as you might see in social networks. FWIW, I believe you need to square elements of the sigma matrix from the SVD to get the eigenvalues. On Thu, Aug 7, 2014 at 10:20 AM, Sean Owen so...@cloudera.com wrote: (-incubator, +user) If your matrix is symmetric (and real I presume), and if my linear algebra isn't too rusty, then its SVD is its eigendecomposition. The SingularValueDecomposition object you get back has U and V, both of which have columns that are the eigenvectors. There are a few SVDs in the Spark code. The one in mllib is not distributed (right?) and is probably not an efficient means of computing eigenvectors if you really just want a decomposition of a symmetric matrix. The one I see in graphx is distributed? I haven't used it though. Maybe it could be part of a solution. On Thu, Aug 7, 2014 at 2:21 PM, yaochunnan yaochun...@gmail.com wrote: Our lab need to do some simulation on online social networks. We need to handle a 5000*5000 adjacency matrix, namely, to get its largest eigenvalue and corresponding eigenvector. Matlab can be used but it is time-consuming. Is Spark effective in linear algebra calculations and transformations? Later we would have 500*500 matrix processed. It seems emergent that we should find some distributed computation platform. I see SVD has been implemented and I can get eigenvalues of a matrix through this API. But when I want to get both eigenvalues and eigenvectors or at least the biggest eigenvalue and the corresponding eigenvector, it seems that current Spark doesn't have such API. Is it possible that I write eigenvalue decomposition from scratch? What should I do? Thanks a lot! Miles Yao View this message in context: How can I implement eigenvalue decomposition in Spark? Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [MLLib]:choosing the Loss function
The loss functions are represented in the various names of the model families. SVM is hinge loss, LogisticRegression is logistic loss, LinearRegression is linear loss. These are used internally as arguments to the SGD and L-BFGS optimizers. On Thu, Aug 7, 2014 at 6:31 PM, SK skrishna...@gmail.com wrote: Hi, According to the MLLib guide, there seems to be support for different loss functions. But I could not find a command line parameter to choose the loss function but only found regType to choose the regularization. Does MLLib support a parameter to choose the loss function? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-choosing-the-Loss-function-tp11738.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem reading from S3 in standalone application
Try s3n:// On Aug 6, 2014, at 12:22 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: I'm getting the same Input path does not exist error also after setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and using the format s3://bucket-name/test_data.txt for the input file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-reading-from-S3-in-standalone-application-tp11524p11526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Computing mean and standard deviation by key
Computing the variance is similar to this example, you just need to keep around the sum of squares as well. The formula for variance is (sumsq/n) - (sum/n)^2 But with big datasets or large values, you can quickly run into overflow issues - MLlib handles this by maintaining the the average sum of squares in an online fashion. (see: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala#L83 ) You might consider just calling into the MLlib stats module directly. On Fri, Aug 1, 2014 at 1:48 PM, Xu (Simon) Chen xche...@gmail.com wrote: I meant not sure how to do variance in one shot :-) With mean in hand, you can obvious broadcast the variable, and do another map/reduce to calculate variance per key. On Fri, Aug 1, 2014 at 4:39 PM, Xu (Simon) Chen xche...@gmail.com wrote: val res = rdd.map(t = (t._1, (t._2.foo, 1))).reduceByKey((x,y) = (x._1+x._2, y._1+y._2)).collect This gives you a list of (key, (tot, count)), which you can easily calculate the mean. Not sure about variance. On Fri, Aug 1, 2014 at 2:55 PM, kriskalish k...@kalish.net wrote: I have what seems like a relatively straightforward task to accomplish, but I cannot seem to figure it out from the Spark documentation or searching the mailing list. I have an RDD[(String, MyClass)] that I would like to group by the key, and calculate the mean and standard deviation of the foo field of MyClass. It feels like I should be able to use group by to get an RDD for each unique key, but it gives me an iterable. As in: val grouped = rdd.groupByKey() grouped.foreach{g = val mean = g.map( x = x.foo).mean() val dev = g.map( x = x.foo ).stddev() // do fancy things with the mean and deviation } However, there seems to be no way to convert the iterable into an RDD. Is there some other technique for doing this? I'm to the point where I'm considering copying and pasting the StatCollector class and changing the type from Double to MyClass (or making it generic). Am I going down the wrong path? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Computing mean and standard deviation by key
Ignoring my warning about overflow - even more functional - just use a reduceByKey. Since your main operation is just a bunch of summing, you've got a commutative-associative reduce operation and spark will run do everything cluster-parallel, and then shuffle the (small) result set and merge appropriately. For example: input .map{ case (k, v) = (k, (1, v, v*v)) } .reduceByKey { case ((c1, s1, ss1), (c2, s2, ss2)) = (c1+c2, s1+s2, ss1+ss2) } .map { case (k, (count, sum, sumsq)) = (k, sumsq/count - (sum/count * sum/count)) } This is by no means the most memory/time efficient way to do it, but I think it's a nice example of how to think about using spark at a higher level of abstraction. - Evan On Fri, Aug 1, 2014 at 2:00 PM, Sean Owen so...@cloudera.com wrote: Here's the more functional programming-friendly take on the computation (but yeah this is the naive formula): rdd.groupByKey.mapValues { mcs = val values = mcs.map(_.foo.toDouble) val n = values.count val sum = values.sum val sumSquares = values.map(x = x * x).sum math.sqrt(n * sumSquares - sum * sum) / n } This gives you a bunch of (key,stdev). I think you want to compute this RDD and *then* do something to save it if you like. Sure, that could be collecting it locally and saving to a DB. Or you could use foreach to do something remotely for every key-value pair. More efficient would be to mapPartitions and do something to a whole partition of key-value pairs at a time. On Fri, Aug 1, 2014 at 9:56 PM, kriskalish k...@kalish.net wrote: So if I do something like this, spark handles the parallelization and recombination of sum and count on the cluster automatically? I started peeking into the source and see that foreach does submit a job to the cluster, but it looked like the inner function needed to return something to work properly. val grouped = rdd.groupByKey() grouped.foreach{ x = val iterable = x._2 var sum = 0.0 var count = 0 iterable.foreach{ y = sum = sum + y.foo count = count + 1 } val mean = sum/count; // save mean to database... } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11207.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Decision tree classifier in MLlib
Can you share the dataset via a gist or something and we can take a look at what's going on? On Fri, Jul 25, 2014 at 10:51 AM, SK skrishna...@gmail.com wrote: yes, the output is continuous. So I used a threshold to get binary labels. If prediction threshold, then class is 0 else 1. I use this binary label to then compute the accuracy. Even with this binary transformation, the accuracy with decision tree model is low compared to LR or SVM (for the specific dataset I used). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457p10678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Getting the number of slaves
Try sc.getExecutorStorageStatus().length SparkContext's getExecutorMemoryStatus or getExecutorStorageStatus will give you back an object per executor - the StorageStatus objects are what drives a lot of the Spark Web UI. https://spark.apache.org/docs/1.0.1/api/scala/index.html#org.apache.spark.SparkContext On Thu, Jul 24, 2014 at 11:16 AM, Nicolas Mai nicolas@gmail.com wrote: Hi, Is there a way to get the number of slaves/workers during runtime? I searched online but didn't find anything :/ The application I'm working will run on different clusters corresponding to different deployment stages (beta - prod). It would be great to get the number of slaves currently in use, in order set the level of parallelism and RDD partitions, based on that number. Thanks! Nicolas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to parallelize model fitting with different cross-validation folds?
To be clear - each of the RDDs is still a distributed dataset and each of the individual SVM models will be trained in parallel across the cluster. Sean's suggestion effectively has you submitting multiple spark jobs simultaneously, which, depending on your cluster configuration and the size of your dataset, may or may not be a good idea. There are some tricks you can do to make training multiple models on the same dataset faster, which we're hoping to expose to users in an upcoming release. - Evan On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote: If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect java.lang.NullPointerException at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:824) at org.apache.spark.rdd.RDD.first(RDD.scala:856) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252
Re: How to use K-fold validation in spark-1.0?
There is a method in org.apache.spark.mllib.util.MLUtils called kFold which will automatically partition your dataset for you into k train/test splits at which point you can build k different models and aggregate the results. For example (a very rough sketch - assuming I want to do 10-fold cross validation on a binary classification model on a file with 1000 features in it): import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.LabelParsers import org.apache.spark.mllib.classification.LogisticRegressionWithSGD val dat = MLUtils.loadLibSVMFile(sc, path/to/data, false, 1000) val cvdat = kFold(dat, 10, 42) val modelErrrors = cvdat.map { case (train, test) = { val model = LogisticRegressionWithSGD.train(train, 100, 0.1, 1.0) val error = computeError(model, test) (model, error)}} //Average error: val avgError = modelErrors.map(_._2).reduce(_ + _) / modelErrors.length Here, I'm assuming you've got some computeError function defined. Note that many of these APIs are marked experimental and thus might change in a future spark release. On Tue, Jun 24, 2014 at 6:44 AM, Eustache DIEMERT eusta...@diemert.fr wrote: I'm interested in this topic too :) Are the MLLib core devs on this list ? E/ 2014-06-24 14:19 GMT+02:00 holdingonrobin robinholdin...@gmail.com: Anyone knows anything about it? Or should I actually move this topic to a MLlib specif mailing list? Any information is appreciated! Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-K-fold-validation-in-spark-1-0-tp8142p8172.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLLib sample data format
These files follow the libsvm format where each line is a record, the first column is a label, and then after that the fields are offset:value where offset is the offset into the feature vector, and value is the value of the input feature. This is a fairly efficient representation for sparse but can double (or more) storage requirements for dense data. - Evan On Jun 22, 2014, at 3:35 PM, Justin Yip yipjus...@gmail.com wrote: Hello, I am looking into a couple of MLLib data files in https://github.com/apache/spark/tree/master/data/mllib. But I cannot find any explanation for these files? Does anyone know if they are documented? Thanks. Justin
Re: Performance problems on SQL JOIN
Also - you could consider caching your data after the first split (before the first filter), this will prevent you from retrieving the data from s3 twice. On Fri, Jun 20, 2014 at 8:32 AM, Xiangrui Meng men...@gmail.com wrote: Your data source is S3 and data is used twice. m1.large does not have very good network performance. Please try file.count() and see how fast it goes. -Xiangrui On Jun 20, 2014, at 8:16 AM, mathias math...@socialsignificance.co.uk wrote: Hi there, We're trying out Spark and are experiencing some performance issues using Spark SQL. Anyone who can tell us if our results are normal? We are using the Amazon EC2 scripts to create a cluster with 3 workers/executors (m1.large). Tried both spark 1.0.0 as well as the git master; the Scala as well as the Python shells. Running the following code takes about 5 minutes, which seems a long time for this query. val file = sc.textFile(s3n:// ... .csv); val data = file.map(x = x.split('|')); // 300k rows case class BookingInfo(num_rooms: String, hotelId: String, toDate: String, ...); val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 50k rows val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 30k rows rooms2.registerAsTable(rooms2); cacheTable(rooms2); rooms3.registerAsTable(rooms3); cacheTable(rooms3); sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId = rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count(); Are we doing something wrong here? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is There Any Benchmarks Comparing C++ MPI with Spark
Larry, I don't see any reference to Spark in particular there. Additionally, the benchmark only scales up to datasets that are roughly 10gb (though I realize they've picked some fairly computationally intensive tasks), and they don't present their results on more than 4 nodes. This can hide things like, for example, a communication pattern that is O(n^2) in the number of cluster nodes. Obviously they've gotten some great performance out of SciDB, but I don't think this answers the MPI vs. Spark question directly. My own experience suggests that as long as your algorithm fits in a BSP programming model, with Spark you can achieve performance that is comparable to a tuned C++/MPI codebase by leveraging the right libraries locally and thinking carefully about what and when you have to communicate. - Evan On Thu, Jun 19, 2014 at 8:48 AM, ldmtwo larry.d.moore...@intel.com wrote: Here is a partial comparison. http://dspace.mit.edu/bitstream/handle/1721.1/82517/MIT-CSAIL-TR-2013-028.pdf?sequence=2 SciDB uses MPI with Intel HW and libraries. Amazing performance at the cost of more work. In case the link stops working: A Complex Analytics Genomics Benchmark Rebecca Taft-, Manasi Vartak-, Nadathur Rajagopalan Satish, Narayanan Sundaram, Samuel Madden, and Michael Stonebraker -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-There-Any-Benchmarks-Comparing-C-MPI-with-Spark-tp7661p7919.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How do you run your spark app?
I use SBT, create an assembly, and then add the assembly jars when I create my spark context. The main executor I run with something like java -cp ... MyDriver. That said - as of spark 1.0 the preferred way to run spark applications is via spark-submit - http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jun 19, 2014 at 11:36 AM, ldmtwo ldm...@gmail.com wrote: I want to ask this, not because I can't read endless documentation and several tutorials, but because there seems to be many ways of doing things and I keep having issues. How do you run /your /spark app? I had it working when I was only using yarn+hadoop1 (Cloudera), then I had to get Spark and Shark working and ended upgrading everything and dropped CDH support. Anyways, this is what I used with master=yarn-client and app_jar being Scala code compiled with Maven. java -cp $CLASSPATH -Dspark.jars=$APP_JAR -Dspark.master=$MASTER $CLASSNAME $ARGS Do you use this? or something else? I could never figure out this method. SPARK_HOME/bin/spark jar APP_JAR ARGS For example: bin/spark-class jar /usr/lib/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 10 10 Do you use SBT or Maven to compile? or something else? ** It seams that I can't get subscribed to the mailing list and I tried both my work email and personal. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-run-your-spark-app-tp7935.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: Patterns for making multiple aggregations in one pass http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: pmml with augustus
I should point out that if you don't want to take a polyglot approach to languages and reside solely in the JVM, then you can just use plain old java serialization on the Model objects that come out of MLlib's APIs from Java or Scala and load them up in another process and call the relevant .predict() method when it comes time to serve. The same approach would probably also work for models trained via MLlib's python APIs, but I haven't tried that. Native PMML serialization would be a nice feature to add to MLlib as a mechanism to transfer models to other environments for further analysis/serving. There's a JIRA discussion about this here: https://issues.apache.org/jira/browse/SPARK-1406 On Tue, Jun 10, 2014 at 10:53 AM, filipus floe...@gmail.com wrote: Thank you very much the cascading project i didn't recognize it at all till now this project is very interesting also I got the idea of the usage of scala as a language for spark - becuase i can intergrate jvm based libraries very easy/naturaly when I got it right mh... but I could also use sparc as a model engine, augustus for the serializer and a third party produkt for the prediction engine like using jpmml mh... got the feeling that i need to do java, scala and python at the same time... first things first - augustus for an pmml output from spark :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Random Forest on Spark
Interesting, and thanks for the thoughts. I think we're on the same page with 100s of millions of records. We've tested the tree implementation in mllib on 1b rows and up to 100 features - though this isn't hitting the 1000s of features you mention. Obviously multi class support isn't there yet, but I can see your point about deeper trees for many class problems. Will try them out on some image processing stuff with 1k classes we're doing in the lab once they are more developed to get a sense for where the issues are. If you're only allocating 2GB/worker you're going to have a hard time getting the real advantages of Spark. For your 1k features causing heap exceptions at depth 5 - are these categorical or continuous? The categorical vars create much smaller histograms. If you're fitting all continuous features, the memory requirements are O(b*d*2^l) where b=number of histogram bins, d=number of features, and l = level of the tree. Even accounting for object overhead, with the default number of bins, the histograms at this depth should be order of 10s of MB, not 2GB - so I'm guessing your cached data is occupying a significant chunk of that 2GB? In the tree PR - Hirakendu Das tested down to depth 10 on 500m data points with 20 continuous features and was able to run without running into memory issues (and scaling properties got better as the depth grew). His worker mem was 7.5GB and 30% of that was reserved for caching. If you wanted to go 1000 features at depth 10 I'd estimate a couple of gigs necessary for heap space for the worker to compute/store the histograms, and I guess 2x that on the master to do the reduce. Again 2GB per worker is pretty tight, because there are overheads of just starting the jvm, launching a worker, loading libraries, etc. - Evan On Apr 17, 2014, at 6:10 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Yes, it should be data specific and perhaps we're biased toward the data sets that we are playing with. To put things in perspective, we're highly interested in (and I believe, our customers are): 1. large (hundreds of millions of rows) 2. multi-class classification - nowadays, dozens of target categories are common and even thousands in some cases - you could imagine that this is a big reason for us requiring more 'complex' models 3. high dimensional with thousands of descriptive and sort-of-independent features From the theoretical perspective, I would argue that it's usually in the best interest to prune as little as possible. I believe that pruning inherently increases bias of an individual tree, which RF can't do anything about while decreasing variance - which is what RF is for. The default pruning criteria for R's reference implementation is min-node of 1 (meaning fully-grown tree) for classification, and 5 for regression. I'd imagine they did at least some empirical testing to justify these values at the time - although at a time of small datasets :). FYI, we are also considering the MLLib decision tree for our Gradient Boosting implementation, however, the memory requirement is still a bit too steep (we were getting heap exceptions at depth limit of 5 with 2GB per worker with approximately 1000 features). Now 2GB per worker is about what we expect our typical customers would tolerate and I don't think that it's unreasonable for shallow trees. On Thu, Apr 17, 2014 at 3:54 PM, Evan R. Sparks evan.spa...@gmail.comwrote: What kind of data are you training on? These effects are *highly* data dependent, and while saying the depth of 10 is simply not adequate to build high-accuracy models may be accurate for the particular problem you're modeling, it is not true in general. From a statistical perspective, I consider each node in each tree an additional degree of freedom for the model, and all else equal I'd expect a model with fewer degrees of freedom to generalize better. Regardless, if there are lots of use cases for really deep trees, we'd like to hear about them so that we can decide how important they are to support! In the context of CART - pruning very specifically refers to a step *after* a tree has been constructed to some depth using cross-validation. This was a variance reduction technique in the original tree work that is unnecessary and computationally expensive in the context of forests. In the original Random Forests paper, there are still stopping criteria - usually either minimum leaf size or minimum split improvement (or both), so training to maximum depth doesn't mean train until you've completely divided your dataset and there's one point per leaf. My point is that if you set minimum leaf size to something like 0.2% of the dataset, then you're not going to get deeper than 10 or 12 levels with a reasonably balanced tree. With respect to PLANET - our implementation is very much in the spirit of planet, but has some key differences - there's good documentation on exactly what the differences are forthcoming, so I
Re: Random Forest on Spark
Sorry - I meant to say that Multiclass classification, Gradient Boosting, and Random Forest support based on the recent Decision Tree implementation in MLlib is planned and coming soon. On Thu, Apr 17, 2014 at 12:07 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Multiclass classification, Gradient Boosting, and Random Forest support for based on the recent Decision Tree implementation in MLlib. Sung - I'd be curious to hear about your use of decision trees (and forests) where you want to go to 100+ depth. My experience with random forests has been that people typically build hundreds of shallow trees (maybe depth 7 or 8), rather than a few (or many) really deep trees. Generally speaking, we save passes over the data by computing histograms per variable per split at each *level* of a decision tree. This can blow up as the level of the decision tree gets deep, but I'd recommend a lot more memory than 2-4GB per worker for most big data workloads. On Thu, Apr 17, 2014 at 11:50 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Debasish, we've tested the MLLib decision tree a bit and it eats up too much memory for RF purposes. Once the tree got to depth 8~9, it was easy to get heap exception, even with 2~4 GB of memory per worker. With RF, it's very easy to get 100+ depth in RF with even only 100,000+ rows (because trees usually are not balanced). Additionally, the lack of multi-class classification limits its applicability. Also, RF requires random features per tree node to be effective (not just bootstrap samples), and MLLib decision tree doesn't support that. On Thu, Apr 17, 2014 at 10:27 AM, Debasish Das debasish.da...@gmail.comwrote: Mllib has decision treethere is a rf pr which is not active nowtake that and swap the tree builder with the fast tree builder that's in mllib...search for the spark jira...the code is based on google planet paper. .. I am sure people in devlist are already working on it...send an email to know the status over there... There is also a rf in cloudera oryx but we could not run it on our data yet Weka 3.7.10 has a multi thread rf that is good to do some adhoc runs but it does not scale... On Apr 17, 2014 2:45 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, For one of my application, I want to use Random forests(RF) on top of spark. I see that currenlty MLLib does not have implementation for RF. What other opensource RF implementations will be great to use with spark in terms of speed? Regards, Laeeq Ahmed, KTH, Sweden.
Re: Random Forest on Spark
What kind of data are you training on? These effects are *highly* data dependent, and while saying the depth of 10 is simply not adequate to build high-accuracy models may be accurate for the particular problem you're modeling, it is not true in general. From a statistical perspective, I consider each node in each tree an additional degree of freedom for the model, and all else equal I'd expect a model with fewer degrees of freedom to generalize better. Regardless, if there are lots of use cases for really deep trees, we'd like to hear about them so that we can decide how important they are to support! In the context of CART - pruning very specifically refers to a step *after* a tree has been constructed to some depth using cross-validation. This was a variance reduction technique in the original tree work that is unnecessary and computationally expensive in the context of forests. In the original Random Forests paper, there are still stopping criteria - usually either minimum leaf size or minimum split improvement (or both), so training to maximum depth doesn't mean train until you've completely divided your dataset and there's one point per leaf. My point is that if you set minimum leaf size to something like 0.2% of the dataset, then you're not going to get deeper than 10 or 12 levels with a reasonably balanced tree. With respect to PLANET - our implementation is very much in the spirit of planet, but has some key differences - there's good documentation on exactly what the differences are forthcoming, so I won't belabor these here. The differences are designed to 1) avoid data shuffling, and 2) minimize number of passes over the training data. Of course, there are tradeoffs involved, and there is at least one really good trick in the PLANET work that we should leverage that we aren't yet - namely once the nodes get small enough for data to fit easily on a single machine, data can be shuffled and then the remainder of the tree can be trained in parallel from each lower node on a single machine This would actually help with the memory overheads in model training when trees get deep - if someone wants to modify the current implementation of trees in MLlib and contribute this optimization as a pull request, it would be welcome! At any rate, we'll take this feedback into account with respect to improving the tree implementation, but if anyone can send over use cases or (even better) datasets where really deep trees are necessary, that would be great! On Thu, Apr 17, 2014 at 1:43 PM, Sung Hwan Chung coded...@cs.stanford.eduwrote: Well, if you read the original paper, http://oz.berkeley.edu/~breiman/randomforest2001.pdf Grow the tree using CART methodology to maximum size and do not prune. Now, the elements of statistical learning book on page 598 says that you could potentially overfit fully-grown regression random forest. However, this effect is very slight, and likely negligible for classifications. http://www.stanford.edu/~hastie/local.ftp/Springer/OLD/ESLII_print4.pdf In our experiments however, if the pruning is drastic, then the performance actually becomes much worse. This makes intuitive sense IMO because a decision tree is a non-parametric model, and the expressibility of a tree depends on the number of nodes. With a huge amount of data (millions or even billions of rows), we found that the depth of 10 is simply not adequate to build high-accuracy models. On Thu, Apr 17, 2014 at 12:30 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Hmm... can you provide some pointers to examples where deep trees are helpful? Typically with Decision Trees you limit depth (either directly or indirectly with minimum node size and minimum improvement criteria) to avoid overfitting. I agree with the assessment that forests are a variance reduction technique, but I'd be a little surprised if a bunch of hugely deep trees don't overfit to training data. I guess I view limiting tree depth as an analogue to regularization in linear models. On Thu, Apr 17, 2014 at 12:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Evan, I actually haven't heard of 'shallow' random forest. I think that the only scenarios where shallow trees are useful are boosting scenarios. AFAIK, Random Forest is a variance reducing technique and doesn't do much about bias (although some people claim that it does have some bias reducing effect). Because shallow trees typically have higher bias than fully-grown trees, people don't often use shallow trees with RF. You can confirm this through some experiments with R's random forest implementation as well. They allow you to set some limits of depth and/or pruning. In contrast, boosting is a bias reduction technique (and increases variance), so people typically use shallow trees. Our empirical experiments also confirmed that shallow trees resulted in drastically lower accuracy for random forests. There are some papers that mix boosting-like
Re: Status of MLI?
That work is under submission at an academic conference and will be made available if/when the paper is published. In terms of algorithms for hyperparameter tuning, we consider Grid Search, Random Search, a couple of older derivative-free optimization methods, and a few newer methods - TPE (aka HyperOpt from James Bergstra), SMAC (from Frank Hutter's group), and Spearmint (Jasper Snoek's method) - the short answer is that in our hands Random Search works surprisingly well for the low-dimensional problems we looked at, but TPE and SMAC perform slightly better. I've got a private branch with TPE (as well as random and grid search) integrated with MLI, but the code is research quality right now and not extremely general. We're actively working on bringing these things up to snuff for a proper open source release. On Fri, Apr 4, 2014 at 11:28 AM, Yi Zou yi.zou.li...@gmail.com wrote: Hi, Evan, Just noticed this thread, do you mind sharing more details regarding algorithms targetted at hyperparameter tuning/model selection? or a link to dev git repo for that work. thanks, yi On Wed, Apr 2, 2014 at 6:03 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Targeting 0.9.0 should work out of the box (just a change to the build.sbt) - I'll push some changes I've been sitting on to the public repo in the next couple of days. On Wed, Apr 2, 2014 at 4:05 AM, Krakna H shankark+...@gmail.com wrote: Thanks for the update Evan! In terms of using MLI, I see that the Github code is linked to Spark 0.8; will it not work with 0.9 (which is what I have set up) or higher versions? On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User List] [hidden email]http://user/SendEmail.jtp?type=nodenode=3632i=0 wrote: Hi there, MLlib is the first component of MLbase - MLI and the higher levels of the stack are still being developed. Look for updates in terms of our progress on the hyperparameter tuning/model selection problem in the next month or so! - Evan On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden email]http://user/SendEmail.jtp?type=nodenode=3615i=0 wrote: Hi Nan, I was actually referring to MLI/MLBase (http://www.mlbase.org); is this being actively developed? I'm familiar with mllib and have been looking at its documentation. Thanks! On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=0wrote: mllib has been part of Spark distribution (under mllib directory), also check http://spark.apache.org/docs/latest/mllib-guide.html and for JIRA, because of the recent migration to apache JIRA, I think all mllib-related issues should be under the Spark umbrella, https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel -- Nan Zhu On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote: What is the current development status of MLI/MLBase? I see that the github repo is lying dormant (https://github.com/amplab/MLI) and JIRA has had no activity in the last 30 days ( https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). Is the plan to add a lot of this into mllib itself without needing a separate API? Thanks! -- View this message in context: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=3632i=1
Re: Status of MLI?
Targeting 0.9.0 should work out of the box (just a change to the build.sbt) - I'll push some changes I've been sitting on to the public repo in the next couple of days. On Wed, Apr 2, 2014 at 4:05 AM, Krakna H shankark+...@gmail.com wrote: Thanks for the update Evan! In terms of using MLI, I see that the Github code is linked to Spark 0.8; will it not work with 0.9 (which is what I have set up) or higher versions? On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3632i=0wrote: Hi there, MLlib is the first component of MLbase - MLI and the higher levels of the stack are still being developed. Look for updates in terms of our progress on the hyperparameter tuning/model selection problem in the next month or so! - Evan On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden email]http://user/SendEmail.jtp?type=nodenode=3615i=0 wrote: Hi Nan, I was actually referring to MLI/MLBase (http://www.mlbase.org); is this being actively developed? I'm familiar with mllib and have been looking at its documentation. Thanks! On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote: mllib has been part of Spark distribution (under mllib directory), also check http://spark.apache.org/docs/latest/mllib-guide.html and for JIRA, because of the recent migration to apache JIRA, I think all mllib-related issues should be under the Spark umbrella, https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel -- Nan Zhu On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote: What is the current development status of MLI/MLBase? I see that the github repo is lying dormant (https://github.com/amplab/MLI) and JIRA has had no activity in the last 30 days ( https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). Is the plan to add a lot of this into mllib itself without needing a separate API? Thanks! -- View this message in context: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=3632i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3632.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Status of MLI?
Hi there, MLlib is the first component of MLbase - MLI and the higher levels of the stack are still being developed. Look for updates in terms of our progress on the hyperparameter tuning/model selection problem in the next month or so! - Evan On Tue, Apr 1, 2014 at 8:05 PM, Krakna H shankark+...@gmail.com wrote: Hi Nan, I was actually referring to MLI/MLBase (http://www.mlbase.org); is this being actively developed? I'm familiar with mllib and have been looking at its documentation. Thanks! On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote: mllib has been part of Spark distribution (under mllib directory), also check http://spark.apache.org/docs/latest/mllib-guide.html and for JIRA, because of the recent migration to apache JIRA, I think all mllib-related issues should be under the Spark umbrella, https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel -- Nan Zhu On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote: What is the current development status of MLI/MLBase? I see that the github repo is lying dormant (https://github.com/amplab/MLI) and JIRA has had no activity in the last 30 days ( https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). Is the plan to add a lot of this into mllib itself without needing a separate API? Thanks! -- View this message in context: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=3612i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: [HELP] ask for some information about public data set
Hi hyqgod, This is probably a better question for the spark user's list than the dev list (cc'ing user and bcc'ing dev on this reply). To answer your question, though: Amazon's Public Datasets Page is a nice place to start: http://aws.amazon.com/datasets/ - these work well with spark because they're often stored on s3 (which spark can read from natively) and it's very easy to spin up a spark cluster on EC2 to begin experimenting with the data. There's also a pretty good list of (mostly big) datasets that google has released over the years here: http://svonava.com/post/62186512058/datasets-released-by-google - Evan On Tue, Feb 25, 2014 at 6:33 PM, 黄远强 hyq...@163.com wrote: Hi all: I am a freshman in Spark community. i dream of being a expert in the field of big data. But i have no idea where to start after i have gone through the published documents in Spark website and examples in Spark source code. I want to know if there are some public data set in the internet that can be utilized to learn Spark and test my some new ideas base on Spark. Thanks a lot. --- Best regards hyqgod