Re: Having Spark read a JSON file
Nick, If you don't want to use avro thrift protbuf etc use a library like lift-json and write the json as string, read it as text file and de serialize using lift json...you can use standard separators like comma tab etc... I am sure there will be better ways to do it but I am new to spark as well... Deb On Feb 23, 2014 9:10 PM, nicholas.chammas nicholas.cham...@gmail.com wrote: I'm new to this field, but it seems like most Big Data examples -- Spark's included -- begin with reading in flat lines of text from a file. How would I go about having Spark turn a large JSON file into an RDD? So the file would just be a text file that looks like this: [{...}, {...}, ...] where the individual JSON objects are arbitrarily complex (i.e. not necessarily flat) and may or may not be on separate lines. Basically, I'm guessing Spark would need to parse the JSON since it cannot rely on newlines as a delimiter. That sounds like a costly thing. Is JSON a bad format to have to deal with, or can Spark efficiently ingest and work with data in this format? If it can, can I get a pointer as to how I would do that? Nick -- View this message in context: Having Spark read a JSON filehttp://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-tp1963.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: MLLib Sparse Input
Hi Xiangrui, We are also adding support for sparse format in mllib...if you have a pull request or jira link could you please point to it ? Jblas does not implememt sparse formats the last time I looked at it but colt had sparse formats which could be reused... Thanks. Deb On Jan 31, 2014 11:15 AM, Xiangrui Meng men...@gmail.com wrote: Hi Jason, Sorry, I didn't see this message before I replied in another thread. So the following is copy-and-paste: We are currently working on the sparse data support, one of the highest priority features for MLlib. All existing algorithms will support sparse input. We will open a JIRA ticket for progress tracking and discussions. Best, Xiangrui On Fri, Jan 31, 2014 at 10:49 AM, jshao jasonsh...@gmail.com wrote: Hi, Spark is absolutely amazing for machine learning as its iterative process is super fast. However one big issue that I realized was that the MLLib API isn't suitable for sparse inputs at all because it requires the feature vector to be a dense array. For example, I currently want to run a logistic regression on data that is wide and sparse (each data point might have 3 million fields with most of them being 0). It is impossible to represent each data point as an array of length 3 million. Can I expect/contribute to any changes that might handle sparse inputs? Thanks, Jason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Sparse-Input-tp1085.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Hadoop MapReduce on Spark
Most of the use cases fall into two categories... 1. Pre-processing over TB/PB scale data where the data size is larger than total RAM available on the cluster...Due to maturity of map-reduce, a DAG based job scheduling framework running on top of Map Reduce (Scalding/Cascading and Scrunch/Crunch) gives you the power to write code in a higher abstraction as Sean mentioned. Since anyway you are shuffing results on disk, here I don't see much difference between Map-Reduce and Spark pipelines 2. Running iterative algorithms over features: Here the data has been cleaned from 1 and you are running algorithmic analysis, perhaps going to convergence of some sort...Map-Reduce paradigm was not meant for such tasks...Even for distributed graphs and streaming data the same analogy holds. Here Spark starts to shine as you can take the DAG and mark parts of the DAG or the whole DAG to be cached in-memory. Scalding/Scrunch can also come up with the api for in-memory caching of parts of the DAG but it is not available yet. Basically to sum up, I think we will need both the tools for different use-cases till they are merged (?) by a higher abstraction layer (hopefully scalding/scrunch ! On Sat, Feb 1, 2014 at 4:43 PM, Sean Owen so...@cloudera.com wrote: An M/R job is a one-shot job, in itself. Making it iterative is what a higher-level controller does, by running it several times and pointing it at the right input. That bit isn't part of M/R. So I don't think you would accomplish this goal by implementing something *under* the M/R API. M/Rs still get written but I think most people serious about it are already using higher-level APIs like Apache Crunch, or Cascading. For those who haven't seen it, Crunch's abstraction bears a lot of resemblance to the Spark model -- handles on remote collections. So, *the reverse* of this suggestion (i.e. Spark-ish API on M/R) is basically Crunch, or Scrunch if you like Scala. I know Josh Wills has put work into getting Crunch to operate *on top of Spark* even. That might be of interest to the original idea of getting a possibly more familiar API, for some current Hadoop devs, running on top of Spark. (Josh tells me it also enables a few tricks that are hard in Spark.) -- Sean Owen | Director, Data Science | London On Sat, Feb 1, 2014 at 11:57 PM, nileshc nil...@nileshc.com wrote: This might seem like a silly question, so please bear with me. I'm not sure about it myself, just would like to know if you think it's utterly unfeasible or not, and if it's at all worth doing. Does anyone feel like it'll be a good idea to build some sort of a library that allows us to write code for Spark using the usual bloated Hadoop API? This is for the people who want to run their existing MapReduce code (with NIL or minimal adjustments) with Spark to take advantage of its speed and its better support for iterative workflows. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-MapReduce-on-Spark-tp1110.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Advices if your worker die often
I have also seen that if one of users of the cluster writes some buggy code the workers die...any idea if these fixes will also help in that scenario ? If you write buggy yarn apps and the code fails on cluster, jvm don't die On Jan 23, 2014 3:07 AM, Sam Bessalah samkil...@gmail.com wrote: Definitely. Thanks. I usually just played around timeouts before. But this helps. Thx On Thu, Jan 23, 2014 at 11:56 AM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Hi sparkers, So I had this problem where my workers were dying or disappearing (and I had to manually kill -9 their processes) often. Sometimes during a computation, sometimes when I Ctrl-C'd the driver, sometimes right at the end of an application execution. It seems that these tuning have solved the problem (in spark-env.sh): export SPARK_DAEMON_JAVA_OPTS=-Dspark.worker.timeout=600 -Dspark.akka.timeout=200 -Dspark.shuffle.consolidateFiles=true export SPARK_JAVA_OPTS=-Dspark.worker.timeout=600 -Dspark.akka.timeout=200 -Dspark.shuffle.consolidateFiles=true Explanation : I've increased the timeout because I had this problem that the master was missing a heartbeat, thus removing the worker, and after that complaining that an unknown worker was sending heartbeats. I've also set the consolidateFiles option, because I noticed that deleting shuffle files in /tmp/spark-local* was taking forever because of the many files my job created. I also added this to all my programs right after the creation of the sparkContext (sc = sparkContext) to cleanly shutdown when cancelling a job : sys.addShutdownHook( { sc.stop() } ) Hope this can be useful to someone Guillaume -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)6 25 48 86 80 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 exensa_logo_mail.png
Re: Quality of documentation (rant)
Hi Ognen, We have been running hdfs, yarn amd spark on 20 beefy nodes. I give half of the cores to spark and use rest for yarn mr. For optimizing the network transfer for rdd creation it is better to have spark run on all nodes of hdfs. For preprocessing the data for algorithms I use yarn mr app since the input data can be stored in various formats that spark does not support yet (things like parquet) but platform people like them due to various reasons like data compression. Once the preprocessor saves the data on hdfs as text file or sequence file, then spark gives you orders of magnitude runtime compared to yarn algorithm. I have benchmarked ALS and could run the dataset in 14 mins for 10 iteration while scalable als algorithm from clodera oryx ran 6 iterations in an hour. Note the they are supposedly implementing same als paper. On the same dataset mahout als fails as it needs more memory than 6 gb which default yarn uses. I have to still look into results in more details and the code to be sure what they are doing. Note that mahout algorithms are not optimized for yarn yet and the master mahout branch is broken for yarn. Thanks to Cloudera help, we could patch it up. Number of yarn algorithms are not very high right now. Cdh5.0 is integrating spark with their cdh manager similar to what they did with solr. It should be released by March 2014. They have the beta already. It will definitely ease up the process to make spark operational. I have not tested my setup on ec2 (it runs on internal hadoop cluster) but for that most likely I will use cdh manager from 5 beta. I will update you more with the ec2 experience. Thanks. Deb On Jan 19, 2014 6:53 AM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: On Sun, Jan 19, 2014 at 2:49 PM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: My basic requirement is to set everything up myself and understand it. For testing purposes my cluster has 15 xlarge instances and I guess I will just set up a hadoop cluster to run over these instances for the purposes of getting the benefits of HDFS. I would then set up hdfs over S3 with blocks. By this I mean I would set up a Hadoop cluster running in parallel on the same instances just for the purposes of running Spark over HDFS. Is this a reasonable approach? What kind of a performance penalty (memory, CPU cycles) am I going to incur by the Hadoop daemons running just for this purpose? Thanks! Ognen
Re: How to make Spark merge the output file?
Hi Nan, A cleaner approach is to export a RESTful service to the external system. The external system calls the service with appropriate api. For Scala, Spray can be used to make these services. Twitter oss also many examples of this service design. Thanks. Deb On Tue, Jan 7, 2014 at 10:25 AM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, all Thanks for the reply I actually need to provide a single file to an external system to process it…seems that I have to make the consumer of the file to support multiple inputs Best, -- Nan Zhu On Tuesday, January 7, 2014 at 12:37 PM, Aaron Davidson wrote: HDFS, since 0.21 https://issues.apache.org/jira/browse/HDFS-222, has a concat() method which would do exactly this, but I am not sure of the performance implications. Of course, as Matei pointed out, it's unusual to actually need a single HDFS file. On Mon, Jan 6, 2014 at 9:08 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Unfortunately this is expensive to do on HDFS — you’d need a single writer to write the whole file. If your file is small enough for that, you can use coalesce() on the RDD to bring all the data to one node, and then save it. However most HDFS applications work with directories containing multiple files instead of single files for this reason. Matei On Jan 6, 2014, at 10:56 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, all maybe a stupid question, but is there any way to make Spark write a single file instead of partitioned files? Best, -- Nan Zhu
Re: Spark Matrix Factorization
Hi Dmitri, We have a mahout mirror from github but I don't see any of the math-scala code. Where do I see the math-scala code ? I thought github mirror is updated with svn repo. Thanks. Deb On Fri, Jan 3, 2014 at 10:43 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Fri, Jan 3, 2014 at 10:28 AM, Sebastian Schelter s...@apache.orgwrote: I wonder if anyone might have recommendation on scala native implementation of SVD. Mahout has a scala implementation of an SVD variant called Stochastic SVD: https://svn.apache.org/viewvc/mahout/trunk/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala?view=markup Mahout also has SVD and Eigen decompositions mapped to scala as svd() and eigen(). Unfortunately i have not put it on wiki yet but the summary is available here https://issues.apache.org/jira/browse/MAHOUT-1297 Mahout also has distributed PCA implementation (which is based on distributed Stochastic SVD and has a special provisions for sparse matrix cases). Unfortunately our wiki is in flux now due to migration off confluence to CMS so the SSVD page has not yet been migrated to CMS so confluence version is here https://cwiki.apache.org/confluence/display/MAHOUT/Stochastic+Singular+Value+Decomposition Otherwise, all the major java math libraries (mahout math, jblas, commons-math) should provide an implementation that you can use in scala. --sebastian C On Thu, Jan 2, 2014 at 7:06 PM, Ameet Talwalkar am...@eecs.berkeley.eduwrote: Hi Deb, Thanks for your email. We currently do not have a DSGD implementation in MLlib. Also, just to clarify, DSGD is not a variant of ALS, but rather a different algorithm for solving the same the same bi-convex objective function. It would be a good thing to do add, but to the best of my knowledge, no one is actively working on this right now. Also, as you mentioned, the ALS implementation in mllib is more robust/scalable than the one in spark.examples. -Ameet On Thu, Jan 2, 2014 at 3:16 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am not noticing any DSGD implementation of ALS in Spark. There are two ALS implementations. org.apache.spark.examples.SparkALS does not run on large matrices and seems more like a demo code. org.apache.spark.mllib.recommendation.ALS looks feels more robust version and I am experimenting with it. References here are Jellyfish, Twitter's implementation of Jellyfish called Scalafish, Google paper called Sparkler and similar idea put forward by IBM paper by Gemulla et al. (large-scale matrix factorization with distributed stochastic gradient descent) https://github.com/azymnis/scalafish Are there any plans of adding DSGD in Spark or there are any existing JIRA ? Thanks. Deb
Re: Spark Matrix Factorization
Hi Ameet, Matrix factorization is a non-convex problem and ALS solves it using 2 convex problems, DSGD solves the problem by finding a local minima. I am experimenting with Spark Parallel ALS but I intend to port Scalafish https://github.com/azymnis/scalafish to Spark as well. For bigger matrices jury is not out that which algorithms provides a better local optima with an iteration bound. It is also highly dependent on datasets I believe. Thanks. Deb On Thu, Jan 2, 2014 at 4:06 PM, Ameet Talwalkar am...@eecs.berkeley.eduwrote: Hi Deb, Thanks for your email. We currently do not have a DSGD implementation in MLlib. Also, just to clarify, DSGD is not a variant of ALS, but rather a different algorithm for solving the same the same bi-convex objective function. It would be a good thing to do add, but to the best of my knowledge, no one is actively working on this right now. Also, as you mentioned, the ALS implementation in mllib is more robust/scalable than the one in spark.examples. -Ameet On Thu, Jan 2, 2014 at 3:16 PM, Debasish Das debasish.da...@gmail.comwrote: Hi, I am not noticing any DSGD implementation of ALS in Spark. There are two ALS implementations. org.apache.spark.examples.SparkALS does not run on large matrices and seems more like a demo code. org.apache.spark.mllib.recommendation.ALS looks feels more robust version and I am experimenting with it. References here are Jellyfish, Twitter's implementation of Jellyfish called Scalafish, Google paper called Sparkler and similar idea put forward by IBM paper by Gemulla et al. (large-scale matrix factorization with distributed stochastic gradient descent) https://github.com/azymnis/scalafish Are there any plans of adding DSGD in Spark or there are any existing JIRA ? Thanks. Deb
Data locality during Spark RDD creation
Hi, I have HDFS and MapReduce running on 20 nodes and a experimental spark cluster running on subset of the HDFS node (say 8 of them). If some ETL is done using MR most likely the data will be replicated across all 20 nodes (assuming I used all the nodes). Is it a good idea to run spark cluster on all 20 nodes where HDFS is running so that all the RDDs are data local and the bulk data transfer is minimized ? Thanks. Deb
Standalone spark cluster dead nodes
Hi, I have been running standalone spark cluster but sometimes I do see dead nodes. The physical machines are not dead but the JVM worker dies. Is there a methodology which automatically restart worker JVM if it dies ? Thanks. Deb
Re: Tree classifiers in MLib
Hi Evan, Could you please point to the git repo for the decision tree classifier or the enhancement JIRA ? Thanks. Deb On Dec 29, 2013 8:55 AM, Evan Sparks evan.spa...@gmail.com wrote: Yes - Manish Amde and Hirakendu Das have been working on a distributed tree classifier. We are taking the current version through large scale testing and expect to merge it into the master branch soon. I expect that ensembled tree learned (random forests, GBDTs) will follow shortly. On Dec 29, 2013, at 10:35 AM, Charles Earl charles.ce...@gmail.com wrote: In the latest API docs off of the web page http://spark.incubator.apache.org/docs/latest/api/mllib/index.html#org.apache.spark.mllib.package I had not seen tree classifiers included. Are there plans to include decision trees etc at some point. Is there an interest? -- - Charles
Worker failure
Hi, I started spark standalone cluster with 4 nodes each running around 8 core. I want to scale it to even larger number of nodes. The issue is that I am noticing lot of worker failure. I believe I am looking for a zookeeper based coordination where if a worker fails, another one is added from a pool of zookeeper host. I am building from the github master. Can I use the zookeeper patch for standalone cluster ? https://github.com/apache/incubator-spark/pull/19 Thanks. Deb
Spark shell vs Spark job
Hi, I have the equivalent code written in a spark script and spark job. My script runs 3X faster than the job. Any idea why I am noticing this discrepancy ? Is spark shell using kryo serialization by default ? Spark shell: use script ./wordcount.scala SPARK_MEM=2g ./spark-shell scala :load wordcount.scala Loading wordcount.scala... inputPath: String = hdfs://x.com:9000/sandbox/data/wordcount/input outputPath: String = hdfs://x.com:9000/sandbox/data/wordcount/output_spark start: Long = 1387388284050 file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:14 words: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[3] at map at console:17 counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at console:18 end: Long = 1387388301740 Non-cached wordcount runtime 17 sec Spark job: use org.apache.spark.examples.HdfsWordCount [debasish@istgbd011 sag_spark]$ SPARK_MEM=2g ./run-example org.apache.spark.examples.HdfsWordCount spark://x.com:7077 hdfs:// x.com:9000/sandbox/data/wordcount/input hdfs://x.com:9000/sandbox/data/wordcount/output_spark Non-cached wordcount runtime 53 sec I like the 17 sec runtime since it is around 3X faster than exact same code in scalding and I have not yet utilized the caching feature. Thanks. Deb