Rank for SQL and ORDER BY?
I’m trying to do simple graph sort in Spark which I mostly have working. The one problem I have now is that I need to order them and then assign a rank position. So the top item should have rank 0, the next one should have rank 1, etc. Hive and Pig support this with the RANK operator. I *think* this is how I would do it with Hive. SELECT target, COUNT(source) AS indegree, rank() OVER (ORDER BY indegree DESC) AS rank FROM mygraph GROUP BY target ORDER BY indegree DESC But that doesn’t seem to work. What’s the easiest way to accomplish this in Spark? Any advice? -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: Discrepancy in PCA values
Hi Xiangrui, Thanks for the reply. Julia code is also using the covariance matrix: (1/n)*X'*X ; Thanks, Upul On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote: The Julia code is computing the SVD of the Gram matrix. PCA should be applied to the covariance matrix. -Xiangrui On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com wrote: Hi All, I tried to do PCA for the Iris dataset [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html ]. Also, PCA was calculated in Julia using following method: Sigma = (1/numRow(X))*X'*X ; [U, S, V] = svd(Sigma); Ureduced = U(:, 1:k); Z = X*Ureduced; However, I'm seeing a little difference between values given by MLLib and the method shown above . Does anyone have any idea about this difference? Additionally, I have attached two visualizations, related to two approaches. Thanks, Upul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web Service + Spark
Cui Lin, The solution largely depends on how you want your services deployed (Java web container, Spray framework, etc...) and if you are using a cluster manager like Yarn or Mesos vs. just firing up your own executors and master. I recently worked on an example for deploying Spark services inside of Jetty using Yarn as the cluster manager. It forced me to learn how Spark wires up the dependencies/classpaths. If it helps, the example that resulted from my tinkering is located at [1]. [1] https://github.com/calrissian/spark-jetty-server On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote: Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
RE: Implement customized Join for SparkSQL
Hi, Rishi You are right. But the ids may be tens of thousands and B is a database with index for id, which means querying by id is very fast. In fact we load A and B as separate schemaRDDs as you suggested. But we hope we can extend the join implementation to achieve it in the parsing stage. Best Regards, Kevin From: Rishi Yadav [mailto:ri...@infoobjects.com] Sent: 2015年1月9日 6:52 To: Dai, Kevin Cc: user@spark.apache.org Subject: Re: Implement customized Join for SparkSQL Hi Kevin, Say A has 10 ids, so you are pulling data from B's data source only for these 10 ids? What if you load A and B as separate schemaRDDs and then do join. Spark will optimize the path anyway when action is fired . On Mon, Jan 5, 2015 at 2:28 AM, Dai, Kevin yun...@ebay.commailto:yun...@ebay.com wrote: Hi, All Suppose I want to join two tables A and B as follows: Select * from A join B on A.id = B.id A is a file while B is a database which indexed by id and I wrapped it by Data source API. The desired join flow is: 1. Generate A’s RDD[Row] 2. Generate B’s RDD[Row] from A by using A’s id and B’s data source api to get row from the database 3. Merge these two RDDs to the final RDD[Row] However it seems existing join strategy doesn’t support it? Any way to achieve it? Best Regards, Kevin.
/tmp directory fills up
Gents, I'm building spark using the current master branch and deploying in to Google Compute Engine on top of Hadoop 2.4/YARN via bdutil, Google's Hadoop cluster provisioning tool. bdutils configures Spark with spark.local.dir=/hadoop/spark/tmp, but this option is ignored in combination with YARN. Bdutils also configures YARN with: property nameyarn.nodemanager.local-dirs/name value/mnt/pd1/hadoop/yarn/nm-local-dir/value description Directories on the local machine in which to application temp files. /description /property This is the right directory for spark to store temporary data in. Still, Spark is creating such directories as this: /tmp/spark-51388ee6-9de6-411d-b9b9-ab6f9502d01e and filling them up with gigabytes worth of output files, filling up the very small root filesystem. How can I diagnose why my Spark installation is not picking up the yarn.nodemanager.local-dirs from yarn? Alex
Re: DeepLearning and Spark ?
Not if broadcast can only be used between stages. To enable this you have to at least make broadcast asynchronous non-blocking. On 9 January 2015 at 18:02, Krishna Sankar ksanka...@gmail.com wrote: I am also looking at this domain. We could potentially use the broadcast capability in Spark to distribute the parameters. Haven't thought thru yet. Cheers k/ On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote: Does it makes sense to use Spark's actor system (e.g. via SparkContext.env.actorSystem) to create parameter server? On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote: You are not the first :) probably not the fifth to have the question. parameter server is not included in spark framework and I've seen all kinds of hacking to improvise it: REST api, HDFS, tachyon, etc. Not sure if an 'official' benchmark implementation will be released soon On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote: Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
OOM exception during row deserialization
Hi, I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a join step. Basically, i have a RDD of rows, that i am joining with another RDD of tuples. Some of the tasks succeed but a fair number failed with OOM exception with stack below. The stack belongs to the 'reducer' that is reading shuffle output from the 'mapper'. My question is what's the object being deserialized here - just a portion of an RDD or the whole RDD partition assigned to current reducer? The rows in the RDD could be large, but definitely not something that would run to 100s of MBs in size, and thus run out of memory. Also, is there a way to determine size of the object being deserialized that results in the error (either by looking at some staging hdfs dir or logs)? java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded} java.util.Arrays.copyOf(Arrays.java:2367) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535) java.lang.StringBuilder.append(StringBuilder.java:204) java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863) java.io.ObjectInputStream.readString(ObjectInputStream.java:1636) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) java.util.ArrayList.readObject(ArrayList.java:771) sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) Thanks, pala
Web Service + Spark
Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
Re: OptionalDataException during Naive Bayes Training
How big is your data? Did you see other error messages from executors? It seems to me like a shuffle communication error. This thread may be relevant: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccalrnvjuvtgae_ag1rqey_cod1nmrlfpesxgsb7g8r21h0bm...@mail.gmail.com%3E -Xiangrui On Fri, Jan 9, 2015 at 3:19 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am using Spark Version 1.1 in standalone mode in the cluster. Sometimes, during Naive Baye's training, I get OptionalDataException at line, map at NaiveBayes.scala:109 I am getting following exception on the console, java.io.OptionalDataException: java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1371) java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) java.util.HashMap.readObject(HashMap.java:1394) sun.reflect.GeneratedMethodAccessor626.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:483) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) What could be the reason behind this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OptionalDataException-during-Naive-Bayes-Training-tp21059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use BigInteger for userId and productId in collaborative Filtering?
Hi, The userId's and productId's in my data are bigInts, what is the best way to run collaborative filtering on this data. Should I modify MLlib's implementation to support more types? or is there an easy way. Thanks!, Nishanth -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-BigInteger-for-userId-and-productId-in-collaborative-Filtering-tp21072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue writing to Cassandra from Spark
Hi, We are currently using spark to join data in Cassandra and then write the results back into Cassandra. While reads happen with out any error during the writes we see many exceptions like below. Our environment details are: - Spark v 1.1.0 - spark-cassandra-connector-java_2.10 v 1.1.0 We are using below settings for the writer spark.cassandra.output.batch.size.rows=1 spark.cassandra.output.concurrent.writes=1 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [] - use getErrors() for details) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108) at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks Ankur
Re: How to use BigInteger for userId and productId in collaborative Filtering?
Do you have more than 2 billion users/products? If not, you can pair each user/product id with an integer (check RDD.zipWithUniqueId), use them in ALS, and then join the original bigInt IDs back after training. -Xiangrui On Fri, Jan 9, 2015 at 5:12 PM, nishanthps nishant...@gmail.com wrote: Hi, The userId's and productId's in my data are bigInts, what is the best way to run collaborative filtering on this data. Should I modify MLlib's implementation to support more types? or is there an easy way. Thanks!, Nishanth -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-BigInteger-for-userId-and-productId-in-collaborative-Filtering-tp21072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Web Service + Spark
You can also look at Spark Job Server https://github.com/spark-jobserver/spark-jobserver - Gaurav On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote: Cui Lin, The solution largely depends on how you want your services deployed (Java web container, Spray framework, etc...) and if you are using a cluster manager like Yarn or Mesos vs. just firing up your own executors and master. I recently worked on an example for deploying Spark services inside of Jetty using Yarn as the cluster manager. It forced me to learn how Spark wires up the dependencies/classpaths. If it helps, the example that resulted from my tinkering is located at [1]. [1] https://github.com/calrissian/spark-jetty-server On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote: Hello, All, What’s the best practice on deploying/publishing spark-based scientific applications into a web service? Similar to Shiny on R. Thanks! Best regards, Cui Lin
Re: Discrepancy in PCA values
You need to subtract mean values to obtain the covariance matrix (http://en.wikipedia.org/wiki/Covariance_matrix). On Fri, Jan 9, 2015 at 6:41 PM, Upul Bandara upulband...@gmail.com wrote: Hi Xiangrui, Thanks for the reply. Julia code is also using the covariance matrix: (1/n)*X'*X ; Thanks, Upul On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote: The Julia code is computing the SVD of the Gram matrix. PCA should be applied to the covariance matrix. -Xiangrui On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com wrote: Hi All, I tried to do PCA for the Iris dataset [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html]. Also, PCA was calculated in Julia using following method: Sigma = (1/numRow(X))*X'*X ; [U, S, V] = svd(Sigma); Ureduced = U(:, 1:k); Z = X*Ureduced; However, I'm seeing a little difference between values given by MLLib and the method shown above . Does anyone have any idea about this difference? Additionally, I have attached two visualizations, related to two approaches. Thanks, Upul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accidental kill in UI
As Sean said, this definitely sounds like something worth a JIRA issue (and PR). On Fri Jan 09 2015 at 8:17:34 AM Sean Owen so...@cloudera.com wrote: (FWIW yes I think this should certainly be a POST. The link can become a miniature form to achieve this and then the endpoint just needs to accept POST only. You should propose a pull request.) On Fri, Jan 9, 2015 at 12:51 PM, Joe Wass jw...@crossref.org wrote: So I had a Spark job with various failures, and I decided to kill it and start again. I clicked the 'kill' link in the web console, restarted the job on the command line and headed back to the web console and refreshed to see how my job was doing... the URL at the time was: /stages/stage/kill?id=1terminate=true Which of course terminated the stage again. No loss, but if I'd waited a few hours before doing that, I would have lost data. I know to be careful next time, but isn't 'don't modify state as a result of a GET request' the first rule of HTTP? It could lead to an expensive mistake. Making this a POST would be a simple fix. Does anyone else think this is worth creating an issue for? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: calculating the mean of SparseVector RDD
colStats() computes the mean values along with several other summary statistics, which makes it slower. How is the performance if you don't use kryo? -Xiangrui On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote: thanks for the suggestion -- however, looks like this is even slower. With the small data set I'm using, my aggregate function takes ~ 9 seconds and the colStats.mean() takes ~ 1 minute. However, I can't get it to run with the Kyro serializer -- I get the error: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 is there an easy/obvious fix? On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote: There is some serialization overhead. You can try https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107 . -Xiangrui On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote: I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Submitting SparkContext and seeing driverPropsFetcher exception
I'm seeing this exception when creating a new SparkContext in YARN: [ERROR] AssociationError [akka.tcp://sparkdri...@coreys-mbp.home:58243] - [akka.tcp://driverpropsfetc...@coreys-mbp.home:58453]: Error [Shut down address: akka.tcp://driverpropsfetc...@coreys-mbp.home:58453] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverpropsfetc...@coreys-mbp.home:58453 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. Any ideas on what this could be?
Re: Zipping RDDs of equal size not possible
sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out How do you guarantee that the two RDDs have the same size? -Xiangrui On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke 1wil...@informatik.uni-hamburg.de wrote: Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:Re: EventBatch and SparkFlumeProtocol not found in spark codebase?
Thanks Sean. I follow the guide, import the codebase into IntellijIdea as Maven project, with the profiles:hadoop2.4 and yarn. In the maven project view, I run Maven Install against the module: Spark Project Parent POM(root).After a pretty long time, all the modules are built successfully. But when I run the LocalPi example, the compiling errors emerge, 1. EventBatch and SparkFlumeProtocol don't exist 2. There are a bunch of errors complaining q is not member of StringContext in CodeGenerator.scala Then, I try by clicking the Generate Sources and Update Folders For All Projects, and repeat maven install...still success with compiling errors there Sean, any guide on this?Thanks At 2015-01-09 18:08:11, Sean Owen so...@cloudera.com wrote: What's up with the IJ questions all of the sudden? This PR from yesterday contains a summary of the answer to your question: https://github.com/apache/spark/pull/3952 : Rebuild Project can fail the first time the project is compiled, because generate source files are not automatically generated. Try clicking the Generate Sources and Update Folders For All Projects button in the Maven Projects tool window to manually generate these sources. On Fri, Jan 9, 2015 at 10:03 AM, bit1...@163.com bit1...@163.com wrote: Hi, When I fetch the Spark code base and import into Intellj Idea as SBT project, then I build it with SBT, but there is compiling errors in the examples module,complaining that the EventBatch and SparkFlumeProtocol,looks they should be in org.apache.spark.streaming.flume.sink package. Not sure what happens. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Spark and HDFS co-location
Spark uses MapReduce InputFormat implementations to read data from disk, so in that sense it has access to, and uses, the same locality info that things like MR do. Yes, tasks go to the data, and you want to run Spark on top of the HDFS DataNodes. (Locality isn't always the only priority that determines where tasks are scheduled, but it certainly matters.) I'm not qualified enough to explain it in more detail, compared to others here. On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote: I am running Spark 1.1.1 built against CDH4 and have a few questions regarding Spark performance related to co-location with HDFS nodes. I want to know whether (and how efficiently) Spark takes advantage of being co-located with a HDFS node? What I mean by this is: if a file is being read by a Spark executor and that file (or most of its blocks) is located in a HDFS DataNode on the same machine as a Spark worker, will it read directly off of disk, or does that data have to travel through the network in some way? Is there a distinct advantage to putting HDFS and Spark on the same box if it is possible or, due to the way blocks are distributed about a cluster, are we so likely to be moving files over the network that co-location doesn’t really make that much of a difference? Also, do you know of any papers/books/other resources (other trying to dig through the spark code) which do a good job of explaining the Spark/HDFS data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)? Thanks! Zach -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Spark and HDFS co-location
I was looking for related information and found: http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pptx See also http://hbase.apache.org/book.html#perf.hdfs.configs.localread for how short circuit read is enabled. Cheers On Fri, Jan 9, 2015 at 3:50 PM, Sean Owen so...@cloudera.com wrote: Spark uses MapReduce InputFormat implementations to read data from disk, so in that sense it has access to, and uses, the same locality info that things like MR do. Yes, tasks go to the data, and you want to run Spark on top of the HDFS DataNodes. (Locality isn't always the only priority that determines where tasks are scheduled, but it certainly matters.) I'm not qualified enough to explain it in more detail, compared to others here. On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote: I am running Spark 1.1.1 built against CDH4 and have a few questions regarding Spark performance related to co-location with HDFS nodes. I want to know whether (and how efficiently) Spark takes advantage of being co-located with a HDFS node? What I mean by this is: if a file is being read by a Spark executor and that file (or most of its blocks) is located in a HDFS DataNode on the same machine as a Spark worker, will it read directly off of disk, or does that data have to travel through the network in some way? Is there a distinct advantage to putting HDFS and Spark on the same box if it is possible or, due to the way blocks are distributed about a cluster, are we so likely to be moving files over the network that co-location doesn’t really make that much of a difference? Also, do you know of any papers/books/other resources (other trying to dig through the spark code) which do a good job of explaining the Spark/HDFS data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)? Thanks! Zach -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Spark and HDFS co-location
Note also for short circuit reads that early versions are actually net-negative in performance. Only after a second hadoop release of the feature did it turn towards being a positive change. See earlier threads on this mailing list where short circuit reads are discussed. On Fri, Jan 9, 2015 at 3:57 PM, Ted Yu yuzhih...@gmail.com wrote: I was looking for related information and found: http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pptx See also http://hbase.apache.org/book.html#perf.hdfs.configs.localread for how short circuit read is enabled. Cheers On Fri, Jan 9, 2015 at 3:50 PM, Sean Owen so...@cloudera.com wrote: Spark uses MapReduce InputFormat implementations to read data from disk, so in that sense it has access to, and uses, the same locality info that things like MR do. Yes, tasks go to the data, and you want to run Spark on top of the HDFS DataNodes. (Locality isn't always the only priority that determines where tasks are scheduled, but it certainly matters.) I'm not qualified enough to explain it in more detail, compared to others here. On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote: I am running Spark 1.1.1 built against CDH4 and have a few questions regarding Spark performance related to co-location with HDFS nodes. I want to know whether (and how efficiently) Spark takes advantage of being co-located with a HDFS node? What I mean by this is: if a file is being read by a Spark executor and that file (or most of its blocks) is located in a HDFS DataNode on the same machine as a Spark worker, will it read directly off of disk, or does that data have to travel through the network in some way? Is there a distinct advantage to putting HDFS and Spark on the same box if it is possible or, due to the way blocks are distributed about a cluster, are we so likely to be moving files over the network that co-location doesn’t really make that much of a difference? Also, do you know of any papers/books/other resources (other trying to dig through the spark code) which do a good job of explaining the Spark/HDFS data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)? Thanks! Zach -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DeepLearning and Spark ?
Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
Re: DeepLearning and Spark ?
Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
How to access OpenHashSet in my standalone program?
Hi, I would like to use OpenHashSet (org.apache.spark.util.collection.OpenHashSet) in my standalone program. I can import it without error as: import org.apache.spark.util.collection.OpenHashSet However, when I try to access it, I am getting an error as: object OpenHashSet in package collection cannot be accessed in package org.apache.spark.util.collection I suspect this error is caused by private object. I am wondering how I can use this object in my standalone program. Thanks, Ted -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Cleaning up spark.local.dir automatically
Thanks, I imagine this will kill any cached RDDs if their files are beyond the ttl? Thanks From: Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com] Sent: 09 January 2015 15:29 To: England, Michael (IT/UK); user@spark.apache.org Subject: Re: Cleaning up spark.local.dir automatically You may like to look at spark.cleaner.ttl configuration which is infinite by default. Spark has that configuration to delete temp files time to time. On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.commailto:michael.engl...@nomura.com wrote: Hi, Is there a way of automatically cleaning up the spark.local.dir after a job has been run? I have noticed a large number of temporary files have been stored here and are not cleaned up. The only solution I can think of is to run some sort of cron job to delete files older than a few days. I am currently using a mixture of standalone and YARN spark builds. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: Cleaning up spark.local.dir automatically
You may like to look at spark.cleaner.ttl configuration which is infinite by default. Spark has that configuration to delete temp files time to time. On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.com wrote: Hi, Is there a way of automatically cleaning up the spark.local.dir after a job has been run? I have noticed a large number of temporary files have been stored here and are not cleaned up. The only solution I can think of is to run some sort of cron job to delete files older than a few days. I am currently using a mixture of standalone and YARN spark builds. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: Set EXTRA_JAR environment variable for spark-jobserver
Boris, Yes, as you mentioned, we are creating a new SparkContext for our Job. The reason being, to define Apache Cassandra connection using SparkConf. We hope, this also should work. For uploading JAR, we followed (1) Package JAR using *sbt package* command (2) Use *curl --data-binary @target/scala-2.10/spark-jobserver-examples_2.10-1.0.0.jar localhost:8090/jars/sparking* command to upload as mentioned in https://github.com/fedragon/spark-jobserver-examples link. We done some samples earlier for connecting Apache Cassandra to spark using Scala language. Initially, we faced same exception as *java.lang.NoClassDefFoundError* during class run and we overcome that using *--jars {required JAR paths}* option during *spark-submit*. Finally, able to run them as regular spark app successfully. So, we are sure of what has been written for this spark-jobserver. To give you some update, we prepared Uber JAR (an integrated JAR with all depedencies) as Pankaj mentioned and now facing *SparkException: Job aborted due to stage failure* for which we need to raise another post. Thank you once again for your suggestions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p21054.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark History Server can't read event logs
Hi Marcelo, On MapR, the mapr user can read the files using the NFS mount, however using the normal hadoop fs -cat /... command, I get permission denied. As the history server is pointing to a location on mapfs, not the NFS mount, I'd imagine the Spark history server is trying to read the files using the hadoop api and therefore the permissions cause issues here. Thanks, Michael -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: 08 January 2015 19:23 To: England, Michael (IT/UK) Cc: user@spark.apache.org Subject: Re: Spark History Server can't read event logs Sorry for the noise; but I just remembered you're actually using MapR (and not HDFS), so maybe the 3777 trick could work... On Thu, Jan 8, 2015 at 10:32 AM, Marcelo Vanzin van...@cloudera.com wrote: Nevermind my last e-mail. HDFS complains about not understanding 3777... On Thu, Jan 8, 2015 at 9:46 AM, Marcelo Vanzin van...@cloudera.com wrote: Hmm. Can you set the permissions of /apps/spark/historyserver/logs to 3777? I'm not sure HDFS respects the group id bit, but it's worth a try. (BTW that would only affect newly created log directories.) On Thu, Jan 8, 2015 at 1:22 AM, michael.engl...@nomura.com wrote: Hi Vanzin, I am using the MapR distribution of Hadoop. The history server logs are created by a job with the permissions: drwxrwx--- - myusername mygroup 2 2015-01-08 09:14 /apps/spark/historyserver/logs/spark-1420708455212 However, the permissions of the higher directories are mapr:mapr and the user that runs Spark in our case is a unix ID called mapr (in the mapr group). Therefore, this can't read my job event logs as shown above. Thanks, Michael -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: 07 January 2015 18:10 To: England, Michael (IT/UK) Cc: user@spark.apache.org Subject: Re: Spark History Server can't read event logs The Spark code generates the log directory with 770 permissions. On top of that you need to make sure of two things: - all directories up to /apps/spark/historyserver/logs/ are readable by the user running the history server - the user running the history server belongs to the group that owns /apps/spark/historyserver/logs/ I think the code could be more explicitly about setting the group of the generated log directories and files, but if you follow the two rules above things should work. Also, I recommend setting /apps/spark/historyserver/logs/ itself to 1777 so that any user can generate logs, but only the owner (or a superuser) can delete them. On Wed, Jan 7, 2015 at 7:45 AM, michael.engl...@nomura.com wrote: Hi, When I run jobs and save the event logs, they are saved with the permissions of the unix user and group that ran the spark job. The history server is run as a service account and therefore can’t read the files: Extract from the History server logs: 2015-01-07 15:37:24,3021 ERROR Client fs/client/fileclient/cc/client.cc:1009 Thread: 1183 User does not have access to open file /apps/spark/historyserver/logs/spark-1420644521194 15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing Spark event log /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1 org.apache.hadoop.security.AccessControlException: Open failed for file: /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error: Permission denied (13) Is there a setting which I can change that allows the files to be world readable or at least by the account running the history server? Currently, the job appears in the History Sever UI but only states ‘Not Started’. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm -- Marcelo This e-mail
Re: Did anyone tried overcommit of CPU cores?
Thanks, but, how to increase the tasks per core? For example, if the application claims 10 cores, is it possible to launch 100 tasks concurrently? On Fri, Jan 9, 2015 at 2:57 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, Based on experiences with other software in virtualized environments I cannot really recommend this. However, I am not sure how Spark reacts. You may face unpredictable task failures depending on utilization, tasks connecting to external systems (databases etc.) may fail unexpectedly and this might be a problem for them (transactions not finishing etc.). Why not increase the tasks per core? Best regards Le 9 janv. 2015 06:46, Xuelin Cao xuelincao2...@gmail.com a écrit : Hi, I'm wondering whether it is a good idea to overcommit CPU cores on the spark cluster. For example, in our testing cluster, each worker machine has 24 physical CPU cores. However, we are allowed to set the CPU core number to 48 or more in the spark configuration file. As a result, we are allowed to launch more tasks than the number of physical CPU cores. The motivation of overcommit CPU cores is, for many times, a task cannot consume 100% resource of a single CPU core (due to I/O, shuffle, etc.). So, overcommit the CPU cores allows more tasks running at the same time, and makes the resource be used economically. But, is there any reason that we should not doing like this? Anyone tried this? [image: Inline image 1]
Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException
I am using pre built *spark-1.2.0-bin-hadoop2.4* from *[1] *to submit spark applications to yarn, I cannot find the pre built spark for *CDH-5.x* versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils class is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part of the pre built spark and hence causing this issue. How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo etc with no luck. *[1]* https://spark.apache.org/downloads.html On Fri, Jan 9, 2015 at 1:12 AM, Marcelo Vanzin van...@cloudera.com wrote: Just to add to Sandy's comment, check your client configuration (generally in /etc/spark/conf). If you're using CM, you may need to run the Deploy Client Configuration command on the cluster to update the configs to match the new version of CDH. On Thu, Jan 8, 2015 at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Those line numbers in ConverterUtils in the stack trace don't appear to line up with CDH 5.3: https://github.com/cloudera/hadoop-common/blob/cdh5-2.5.0_5.3.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Is it possible you're still including the old jars on the classpath in some way? -Sandy On Thu, Jan 8, 2015 at 3:38 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, I am running spark inside YARN job. The spark-streaming job is running fine in CDH-5.0.0 but after the upgrade to 5.3.0 it cannot fetch containers with the below errors. Looks like the container id is incorrect and a string is present in a pace where it's expecting a number. java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 Caused by: java.lang.NumberFormatException: For input string: e01 Is this a bug?? Did you face something similar and any ideas how to fix this? 15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) at org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.NumberFormatException: For input string: e01 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:441) at java.lang.Long.parseLong(Long.java:483) at org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137) at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177) ... 11 more 15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 10, (reason: Uncaught exception: Invalid ContainerId: container_e01_1420481081140_0006_01_01) -- Thanks Regards, Mukesh Jha -- Marcelo -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
EventBatch and SparkFlumeProtocol not found in spark codebase?
Hi, When I fetch the Spark code base and import into Intellj Idea as SBT project, then I build it with SBT, but there is compiling errors in the examples module,complaining that the EventBatch and SparkFlumeProtocol,looks they should be in org.apache.spark.streaming.flume.sink package. Not sure what happens. Thanks.
Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?
Guys, I have a question regarding to Spark 1.1 broadcast implementation. In our pipeline, we have a large multi-class LR model, which is about 1GiB size. To employ the benefit of Spark parallelism, a natural thinking is to broadcast this model file to the worker node. However, it looks that broadcast performance is not quite good. During the process of broadcasting the model file, I just monitor the network card throughput of worker node, their recv/write throughput is just around 30~40 MiB( our server box is equipped with 100MiB ethernet card). Is this the real limitation of Spark 1.1 broadcast implementation? Or there may be some configuration or tricks that can help make Spark broadcast perform better. Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: skipping header from each file
I think this was already answered on stackoverflow: http://stackoverflow.com/questions/27854919/skipping-header-file-from-each-csv-file-in-spark where the one additional idea would be: If there were just one header line, in the first record, then the most efficient way to filter it out is: rdd.mapPartitionsWithIndex { (idx, iter) = if (idx == 0) iter.drop(1) else iter } This doesn't help if of course there are many files with many header lines inside. You can union 3 RDDs you make this way and union them. On Fri, Jan 9, 2015 at 6:18 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Suppose I give three files paths to spark context to read and each file has schema in first row. how can we skip schema lines from headers val rdd=sc.textFile(file1,file2,file3); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/skipping-header-from-each-file-tp21051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: skipping header from each file
May be you can use wholeTextFiles method, which returns filename and content of the file as PariRDD and ,then you can remove the first line from files. -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: Friday, January 09, 2015 11:48 AM To: user@spark.apache.org Subject: skipping header from each file Suppose I give three files paths to spark context to read and each file has schema in first row. how can we skip schema lines from headers val rdd=sc.textFile(file1,file2,file3); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/skipping-header-from-each-file-tp21051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS*** - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: Mkdirs failed to create file:/some/path/myapp.csv while using rdd.saveAsTextFile(fileAddress) Spark
I am facing same exception in saveAsObjectFile. Have you found any solution ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-Mkdirs-failed-to-create-file-some-path-myapp-csv-while-using-rdd-saveAsTextFile-k-tp20994p21066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TF-IDF from spark-1.1.0 not working on cluster mode
This is worker log, not executor log. The executor log can be found in folders like /newdisk2/rta/rtauser/workerdir/app-20150109182514-0001/0/ . -Xiangrui On Fri, Jan 9, 2015 at 5:03 AM, Priya Ch learnings.chitt...@gmail.com wrote: Please find the attached worker log. I could see stream closed exception On Wed, Jan 7, 2015 at 10:51 AM, Xiangrui Meng men...@gmail.com wrote: Could you attach the executor log? That may help identify the root cause. -Xiangrui On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in local mode and not on distributed mode. Null pointer exception has been thrown. Is this a bug in spark-1.1.0 ? Following is the code: def main(args:Array[String]) { val conf=new SparkConf val sc=new SparkContext(conf) val documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split( ).toSeq) val hashingTF = new HashingTF() val tf= hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) val rdd=tfidf.map { vec = println(vector is+vec) (10) } rdd.saveAsTextFile(/home/padma/usecase) } Exception thrown: 15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167] with ID 0 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM 15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection from [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888] 15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130] 15/01/06 12:36:12 INFO network.SendingConnection: Connected to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1 GB) 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB) 15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Thanks, Padma Ch - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?
In the current implementation of TorrentBroadcast, the blocks are fetched one-by-one in single thread, so it can not fully utilize the network bandwidth. Davies On Fri, Jan 9, 2015 at 2:11 AM, Jun Yang yangjun...@gmail.com wrote: Guys, I have a question regarding to Spark 1.1 broadcast implementation. In our pipeline, we have a large multi-class LR model, which is about 1GiB size. To employ the benefit of Spark parallelism, a natural thinking is to broadcast this model file to the worker node. However, it looks that broadcast performance is not quite good. During the process of broadcasting the model file, I just monitor the network card throughput of worker node, their recv/write throughput is just around 30~40 MiB( our server box is equipped with 100MiB ethernet card). Is this the real limitation of Spark 1.1 broadcast implementation? Or there may be some configuration or tricks that can help make Spark broadcast perform better. Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DeepLearning and Spark ?
You are not the first :) probably not the fifth to have the question. parameter server is not included in spark framework and I've seen all kinds of hacking to improvise it: REST api, HDFS, tachyon, etc. Not sure if an 'official' benchmark implementation will be released soon On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote: Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
RE: RowMatrix.multiply() ?
I’m resurrecting this thread because I’m interested in doing transpose on a RowMatrix. There is this other thread too: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is still in work at this time. Is this the correct Jira issue for the transpose operation? ETA? Thanks a lot! -A From: Reza Zadeh [mailto:r...@databricks.com] Sent: October-15-14 1:48 PM To: ll Cc: u...@spark.incubator.apache.org Subject: Re: RowMatrix.multiply() ? Hi, We are currently working on distributed matrix operations. Two RowMatrices cannot be currently multiplied together. Neither can be they be added. They functionality will be added soon. You can of course achieve this yourself by using IndexedRowMatrix and doing one join per operation you requested. Best, Reza On Wed, Oct 15, 2014 at 8:50 AM, ll duy.huynh@gmail.commailto:duy.huynh@gmail.com wrote: hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter and returns the result as a distributed RowMatrix. how do you perform this series of multiplications if A, B, C, and D are all RowMatrix? ((A x B) x C) x D) thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Parquet compression codecs not applied
This is a little confusing, but that code path is actually going through hive. So the spark sql configuration does not help. Perhaps, try: set parquet.compression=GZIP; On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL: Storing AVRO Schema in Parquet
Hi Raghavendra, This makes a lot of sense. Thank you. The problem is that I'm using Spark SQL right now to generate the parquet file. What I think I need to do is to use Spark directly and transform all rows from SchemaRDD to avro objects and supply it to use saveAsNewAPIHadoopFile (from the PairRDD). From there, I can supply the avro schema to parquet via AvroParquetOutputFormat. It is not difficult just not as simple as I would like because SchemaRDD can write to Parquet file using its schema and if I can supply the avro schema to parquet, it save me the transformation step for avro objects. I'm thinking of overriding the saveAsParquetFile method to allows me to persist the avro schema inside parquet. Is this possible at all? Best Regards, Jerry On Fri, Jan 9, 2015 at 2:05 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I cam across this http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. You can take a look. On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I have the similar kind of requirement where I want to push avro data into parquet. But it seems you have to do it on your own. There is parquet-mr project that uses hadoop to do so. I am trying to write a spark job to do similar kind of thing. On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, I'm using spark SQL to create parquet files on HDFS. I would like to store the avro schema into the parquet meta so that non spark sql applications can marshall the data without avro schema using the avro parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do that. Is there another API that allows me to do this? Best Regards, Jerry
Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 2. The difference between 1 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects. On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com wrote: Hey Nathan, Thanks for sharing, this is a very interesting post :) My comments are inlined below. Cheng On 1/7/15 11:53 AM, Nathan McCarthy wrote: Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) I believe the evil thing that makes this snippet much slower is the for-loop. According to my early benchmark done with Scala 2.9, for-loop can be orders of magnitude slower than a simple while-loop, especially when the body of the loop only does something as trivial as this case. The reason is that Scala for-loop is translated into corresponding foreach/map/flatMap/withFilter function calls. And that's exactly why Spark SQL tries to avoid for-loop or any other functional style code in critical paths (where every row is touched), we also uses reusable mutable row objects instead of the immutable version to improve performance. You may check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, which also use imperative while-loop and reusable mutable rows. Another thing to notice is that the `hrs` iterator physically points to underlying in-memory columnar byte buffers, and the `for (r - hrs) { ... }` loop actually decompresses and extracts values from required byte buffers (this is the unwrapping processes you mentioned below). Now this takes around ~49 seconds… Even though test1 table is 100% cached. The number of partitions remains the same… Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, sales: Double) Convert the SchemaRDD; val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache() //cache all the data rdd.count() Then run basically the same MapPartitions query; rdd.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.hour qtySum(hr) += r.qty salesSum(hr) += r.sales } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) This takes around 1.5 seconds! Albeit the memory footprint is much larger. I guess this 1.5 seconds doesn't include the time spent on caching the simple RDD? As I've explained above, in the first `mapPartitions` style snippet, columnar byte buffer unwrapping happens within the `mapPartitions` call. However, in this version, the unwrapping process happens when the `rdd.count()` action is performed. At that point, all values of all columns are extracted from underlying byte buffers, and the portion of data you need are then manually selected and transformed into the simple case class RDD via the `map` call. If you include time spent on caching the simple case class RDD, it should be even slower than the first `mapPartitions` version. My thinking is that because SparkSQL does store things in a columnar format, there is some unwrapping to be done out of the column array buffers which takes time and for some reason this just takes longer when I switch out to map partitions (maybe its unwrapping the entire row, even though I’m using just a subset of columns, or maybe there is some object creation/autoboxing going on when calling getInt or getDouble)…
Re: Failed to save RDD as text file to local file system
Have you found any resolution for this issue ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Failed to save RDD as text file to local file system
No, do you have any idea? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: firemonk9 [via Apache Spark User List] [mailto:ml-node+s1001560n21067...@n3.nabble.com] Sent: Friday, January 09, 2015 2:56 PM To: Wang, Ningjun (LNG-NPV) Subject: Re: Failed to save RDD as text file to local file system Have you found any resolution for this issue ? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html To unsubscribe from Failed to save RDD as text file to local file system, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21050code=bmluZ2p1bi53YW5nQGxleGlzbmV4aXMuY29tfDIxMDUwfC0xNzk5Mzg3ODYz. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Parquet predicate pushdown troubles
I am running the following (connecting to an external Hive Metastore) /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf *spark.sql.parquet.filterPushdown=true* val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) and then ran two queries: sqlContext.sql(select count(*) from table where partition='blah' ) andsqlContext.sql(select count(*) from table where partition='blah' and epoch=1415561604) According to the Input tab in the UI both scan about 140G of data which is the size of my whole partition. So I have two questions -- 1. is there a way to tell from the plan if a predicate pushdown is supposed to happen? I see this for the second query res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] Exchange SinglePartition Aggregate true, [], [COUNT(1) AS PartialCount#49L] OutputFaker [] Project [] ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files 2. am I doing something obviously wrong that this is not working? (Im guessing it's not woring because the input size for the second query shows unchanged and the execution time is almost 2x as long) thanks in advance for any insights
updateStateByKey: cleaning up state for keys not in current window
I know that in order to clean up the state for a key I have to return None when I call updateStateByKey. However, as far as I understand, updateStateByKey only gets called for new keys (i.e. keys in current batch), not for all keys in the DStream. So, how can I clear the state for those keys in this case? Or, in other words, how can I clear the state for a key when Seq[V] is empty? Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: RowMatrix.multiply() ?
Yes that is the correct JIRA. It should make it to 1.3. Best, Reza On Fri, Jan 9, 2015 at 11:13 AM, Adrian Mocanu amoc...@verticalscope.com wrote: I’m resurrecting this thread because I’m interested in doing transpose on a RowMatrix. There is this other thread too: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is still in work at this time. Is this the correct Jira issue for the transpose operation? ETA? Thanks a lot! -A *From:* Reza Zadeh [mailto:r...@databricks.com] *Sent:* October-15-14 1:48 PM *To:* ll *Cc:* u...@spark.incubator.apache.org *Subject:* Re: RowMatrix.multiply() ? Hi, We are currently working on distributed matrix operations. Two RowMatrices cannot be currently multiplied together. Neither can be they be added. They functionality will be added soon. You can of course achieve this yourself by using IndexedRowMatrix and doing one join per operation you requested. Best, Reza On Wed, Oct 15, 2014 at 8:50 AM, ll duy.huynh@gmail.com wrote: hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter and returns the result as a distributed RowMatrix. how do you perform this series of multiplications if A, B, C, and D are all RowMatrix? ((A x B) x C) x D) thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Questions about Spark and HDFS co-location
I am running Spark 1.1.1 built against CDH4 and have a few questions regarding Spark performance related to co-location with HDFS nodes. I want to know whether (and how efficiently) Spark takes advantage of being co-located with a HDFS node? What I mean by this is: if a file is being read by a Spark executor and that file (or most of its blocks) is located in a HDFS DataNode on the same machine as a Spark worker, will it read directly off of disk, or does that data have to travel through the network in some way? Is there a distinct advantage to putting HDFS and Spark on the same box if it is possible or, due to the way blocks are distributed about a cluster, are we so likely to be moving files over the network that co-location doesn’t really make that much of a difference? Also, do you know of any papers/books/other resources (other trying to dig through the spark code) which do a good job of explaining the Spark/HDFS data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)? Thanks! Zach -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading HBase data - Exception
This problem got resolved. In spark-submit I was using only --driver-class-path option, once i added --jars option so that the workers are aware of the dependant jar files, the problem went away. Need to check if there are any worker logs that gives better information than the exception I was getting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-HBase-data-Exception-tp21009p21071.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cleaning up spark.local.dir automatically
That's a worker setting which cleans up the files left behind by executors, so spark.cleaner.ttl isn't at the RDD level. After https://issues.apache.org/jira/browse/SPARK-1860 the cleaner won't clean up directories left by running executors. On Fri, Jan 9, 2015 at 7:38 AM, michael.engl...@nomura.com wrote: Thanks, I imagine this will kill any cached RDDs if their files are beyond the ttl? Thanks *From:* Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com] *Sent:* 09 January 2015 15:29 *To:* England, Michael (IT/UK); user@spark.apache.org *Subject:* Re: Cleaning up spark.local.dir automatically You may like to look at spark.cleaner.ttl configuration which is infinite by default. Spark has that configuration to delete temp files time to time. On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.com wrote: Hi, Is there a way of automatically cleaning up the spark.local.dir after a job has been run? I have noticed a large number of temporary files have been stored here and are not cleaned up. The only solution I can think of is to run some sort of cron job to delete files older than a few days. I am currently using a mixture of standalone and YARN spark builds. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: DeepLearning and Spark ?
Does it makes sense to use Spark's actor system (e.g. via SparkContext.env.actorSystem) to create parameter server? On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote: You are not the first :) probably not the fifth to have the question. parameter server is not included in spark framework and I've seen all kinds of hacking to improvise it: REST api, HDFS, tachyon, etc. Not sure if an 'official' benchmark implementation will be released soon On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote: Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
Re: DeepLearning and Spark ?
I am also looking at this domain. We could potentially use the broadcast capability in Spark to distribute the parameters. Haven't thought thru yet. Cheers k/ On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote: Does it makes sense to use Spark's actor system (e.g. via SparkContext.env.actorSystem) to create parameter server? On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote: You are not the first :) probably not the fifth to have the question. parameter server is not included in spark framework and I've seen all kinds of hacking to improvise it: REST api, HDFS, tachyon, etc. Not sure if an 'official' benchmark implementation will be released soon On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote: Pretty vague on details: http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DeepLearning algorithms are popular and achieve many state of the art performance in several real world machine learning problems. Currently there are no DL implementation in spark and I wonder if there is an ongoing work on this topics. We can do DL in spark Sparkling water and H2O but this adds an additional software stack. Deeplearning4j seems to implements a distributed version of many popural DL algorithm. Porting DL4j in Spark can be interesting. Google describes an implementation of a large scale DL in this paper http://research.google.com/archive/large_deep_networks_nips2012.html. Based on model parallelism and data parallelism. So, I'm trying to imaging what should be a good design for DL algorithm in Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for the model parallelism (as DNN are generally designed as DAG) ? And what about using GPUs to do local parallelism (mecanism to push partition into GPU memory ) ? What do you think about this ? Cheers, Jao
Re: RDD Moving Average
Read this: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
Streaming Checkpointing
In Spark Streaming apps if I enable ssc.checkpoint(dir) does this checkpoint all RDDs? Or is it just checkpointing windowing and state RDDs? For example, if in a DStream I am using an iterative algorithm on a non-state non-window RDD, do I have to checkpoint it explicitly myself, or can I assume that ssc.checkpoint has taken care of checkpointing it?
Re: EventBatch and SparkFlumeProtocol not found in spark codebase?
What's up with the IJ questions all of the sudden? This PR from yesterday contains a summary of the answer to your question: https://github.com/apache/spark/pull/3952 : Rebuild Project can fail the first time the project is compiled, because generate source files are not automatically generated. Try clicking the Generate Sources and Update Folders For All Projects button in the Maven Projects tool window to manually generate these sources. On Fri, Jan 9, 2015 at 10:03 AM, bit1...@163.com bit1...@163.com wrote: Hi, When I fetch the Spark code base and import into Intellj Idea as SBT project, then I build it with SBT, but there is compiling errors in the examples module,complaining that the EventBatch and SparkFlumeProtocol,looks they should be in org.apache.spark.streaming.flume.sink package. Not sure what happens. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?
You can try the following: - Increase spark.akka.frameSize (default is 10MB) - Try using torrentBroadcast Thanks Best Regards On Fri, Jan 9, 2015 at 3:41 PM, Jun Yang yangjun...@gmail.com wrote: Guys, I have a question regarding to Spark 1.1 broadcast implementation. In our pipeline, we have a large multi-class LR model, which is about 1GiB size. To employ the benefit of Spark parallelism, a natural thinking is to broadcast this model file to the worker node. However, it looks that broadcast performance is not quite good. During the process of broadcasting the model file, I just monitor the network card throughput of worker node, their recv/write throughput is just around 30~40 MiB( our server box is equipped with 100MiB ethernet card). Is this the real limitation of Spark 1.1 broadcast implementation? Or there may be some configuration or tricks that can help make Spark broadcast perform better. Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
KryoDeserialization getting java.io.EOFException
hi,when i run a query in spark sql ,there give me follow error,what's processible reason can casuse this problem ava.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:148) at org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:100) at org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:99) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:126) at org.apache.spark.sql.hbase.HBasePartitioner.readObject(HBasePartitioner.scala:99) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/01/10 01:50:25 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:148) at org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:100) at org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:99) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:126) at org.apache.spark.sql.hbase.HBasePartitioner.readObject(HBasePartitioner.scala:99) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at
OptionalDataException during Naive Bayes Training
Hi, I am using Spark Version 1.1 in standalone mode in the cluster. Sometimes, during Naive Baye's training, I get OptionalDataException at line, map at NaiveBayes.scala:109 I am getting following exception on the console, java.io.OptionalDataException: java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1371) java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) java.util.HashMap.readObject(HashMap.java:1394) sun.reflect.GeneratedMethodAccessor626.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:483) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) What could be the reason behind this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OptionalDataException-during-Naive-Bayes-Training-tp21059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Queue independent jobs
You can parallelize on the driver side. The way to do it is almost exactly what you have here, where you're iterating over a local Scala collection of dates and invoking a Spark operation for each. Simply write dateList.par.map(...) to make the local map proceed in parallel. It should invoke the Spark jobs simultaneously. On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote: Hey, Lets say we have multiple independent jobs that each transform some data and store in distinct hdfs locations, is there a nice way to run them in parallel? See the following pseudo code snippet: dateList.map(date = sc.hdfsFile(date).map(transform).saveAsHadoopFile(date)) It's unfortunate if they run in sequence, since all the executors are not used efficiently. What's the best way to parallelize execution of these jobs? Thanks, Anders - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException
Again this is probably not the place for CDH-specific questions, and this one is already answered at http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/CDH-5-3-0-container-cannot-be-fetched-because-of/m-p/23497#M478 On Fri, Jan 9, 2015 at 9:23 AM, Mukesh Jha me.mukesh@gmail.com wrote: I am using pre built spark-1.2.0-bin-hadoop2.4 from [1] to submit spark applications to yarn, I cannot find the pre built spark for CDH-5.x versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils class is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part of the pre built spark and hence causing this issue. How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo etc with no luck. [1] https://spark.apache.org/downloads.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parallel execution on one node
The repartitioning has some overhead. Do your results show that perhaps this is what is taking the extra time? You can try reading the source file with more partitions instead of partitioning it after the fact. See the minPartitions argument to loadLibSVMFile. But before that, I'd assess whether the core computation here is even taking much time. If it's already a small fraction of the total runtime, speeding it up 16x doesn't do much. On Fri, Jan 9, 2015 at 6:57 AM, mikens msmel...@gmail.com wrote: Hello, I am new to Spark. I have adapted an example code to do binary classification using logistic regression. I tried it on rcv1_train.binary dataset using LBFGS.runLBFGS solver, and obtained correct loss. Now, I'd like to run code in parallel across 16 cores of my single CPU socket. If I understand correctly, parallelism in Spark is achieved by partitioning dataset into some number of partitions, approximately 3-4 times the amount of cores in the system. To partition the data, I am calling data.repartition(npart), where npart is number of partitions (16*4=64 in my case). I run the code as follows: spark-submit --master local[16] --class logreg target/scala-2.10/logistic-regression_2.10-1.0.2.jar 72 However, I do not observe any speedup compared to when I just use one partition. I would much appreciate your help understanding what I am doing wrong and why I am not seeing any speedup due to 16 cores. Please find my code below. Best, Mike *CODE* object logreg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(logreg) val sc = new SparkContext(conf) val npart=args(0).toInt; val data_ = MLUtils.loadLibSVMFile(sc, rcv1_train.binary.0label).cache() val data=data_.repartition(npart); // partition dataset in npart partitions val lambda=(1.0/data.count()) val splits = data.randomSplit(Array(1.0, 0.0), seed = 11L) val training = splits(0).map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numFeatures = data.take(1)(0).features.size val start = System.currentTimeMillis val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(training, new LogisticGradient(), new SquaredL2Updater(), 10, 1e-14, 100, lambda, initialWeightsWithIntercept) val took = (System.currentTimeMillis - start)/1000.0; println(LBFGS.runLBFGS: + took + s) sc.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-on-one-node-tp21052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PermGen issues on AWS
Thanks, I noticed this after posting. I'll try that. I also think that perhaps Clojure might be creating more classes than the equivalent Java would, so I'll nudge it a bit higher. On 9 January 2015 at 11:45, Sean Owen so...@cloudera.com wrote: It's normal for PermGen to be a bit more of an issue with Spark than for other JVM-based applications. You should simply increase the PermGen size, which I don't see in your command. -XX:MaxPermSize=256m allows it to grow to 256m for example. The right size depends on your total heap size and app. Also, Java 8 no longer has a permanent generation, so this particular type of problem and tuning is not needed. You might consider running on Java 8. On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass jw...@crossref.org wrote: I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW I'm using the Flambo Clojure wrapper which uses the Java API but I don't think that should make any difference. I'm running with the following command: spark/bin/spark-submit --class mything.core --name My Thing --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar For one of the stages I'm getting errors: - ExecutorLostFailure (executor lost) - Resubmitted (resubmitted due to lost executor) And I think they're caused by slave executor JVMs dying up with this error: java.lang.OutOfMemoryError: PermGen space java.lang.Class.getDeclaredConstructors0(Native Method) java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) java.lang.Class.getConstructor0(Class.java:2885) java.lang.Class.newInstance(Class.java:350) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396) java.security.AccessController.doPrivileged(Native Method) sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395) sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113) sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331) java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376) java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded / 1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted (resubmitted due to lost executor). Now my Aggregated Metrics by Executor shows that 10 out of 16 executors show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't been restarted. Now the 'Executors' tab shows only 7 executors. - Is this normal? - Any ideas why this is happening? - Any other measures I can take to prevent this? - Is the rest of my app going to run on a reduced number of executors? - Can I re-start the executors mid-application? This is a long-running job, so I'd like to do what I can whilst it's running, if possible. - Am I correct in thinking that the --conf arguments are supplied to the JVMs of the slave executors, so they will be receiving the extraJavaOptions and memoryOverhead? Thanks very much! Joe
Re: Mapping directory structure to columns in SparkSQL
Hi Michael, I have got the directory based column support working at least in a trial. I have put the trial code here - DirIndexParquet.scala https://github.com/MickDavies/spark-parquet-dirindex/blob/master/src/main/scala/org/apache/spark/sql/parquet/DirIndexParquet.scala it has involved me copying quite a lot of newParquet. There are some tests here that parquet https://github.com/MickDavies/spark-parquet-dirindex/tree/master/src/test/scala/org/apache/spark/sql/parquet illustrate use. I’d be keen to help in anyway with the datasources API changes that you mention, would you like to discuss? Thanks Mick On 30 Dec 2014, at 17:40, Michael Davies michael.belldav...@gmail.com wrote: Hi Michael, I’ve looked through the example and the test cases and I think I understand what we need to do - so I’ll give it a go. I think what I’d like to try to do is allow files to be added at anytime, so perhaps I can cache partition info, and also what may be useful for us would be to derive schema from the set of all files, hopefully this is achievable also. Thanks Mick On 30 Dec 2014, at 04:49, Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com wrote: You can't do this now without writing a bunch of custom logic (see here for an example: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala) I would like to make this easier as part of improvements to the datasources api that we are planning for Spark 1.3 On Mon, Dec 29, 2014 at 2:19 AM, Mickalas michael.belldav...@gmail.com mailto:michael.belldav...@gmail.com wrote: I see that there is already a request to add wildcard support to the SQLContext.parquetFile function https://issues.apache.org/jira/browse/SPARK-3928 https://issues.apache.org/jira/browse/SPARK-3928. What seems like a useful thing for our use case is to associate the directory structure with certain columns in the table, but it does not seem like this is supported. For example we want to create parquet files on a daily basis associated with geographic regions and so will create a set of files under directories such as: * 2014-12-29/Americas * 2014-12-29/Asia * 2014-12-30/Americas * ... Where queries have predicates that match the column values determinable from directory structure it would be good to only extract data from matching files. Does anyone know if something like this is supported, or whether this is a reasonable thing to request? Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-directory-structure-to-columns-in-SparkSQL-tp20880.html http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-directory-structure-to-columns-in-SparkSQL-tp20880.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Play Scala Spark Exmaple
Hi guys, I running the following example : https://github.com/knoldus/Play-Spark-Scala in the same machine as the spark master, and the spark cluster was lauched with ec2 script. I'm stuck with this errors, any idea how to fix it? Regards Eduardo call the play app prints the following exception : [*error*] a.r.EndpointWriter - AssociationError [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] - [akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575]: Error [Shut down address: akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. The master recive the spark application and generate the following stderr log page: 15/01/09 13:31:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856] 15/01/09 13:31:23 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856] 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'sparkExecutor' on port 37856. 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/MapOutputTracker 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/BlockManagerMaster 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20150109133123-3805 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at /mnt2/spark/spark-local-20150109133123-b05e 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 36936. 15/01/09 13:31:23 INFO network.ConnectionManager: Bound socket to port 36936 with id = ConnectionManagerId(ip-10-158-18-250.ec2.internal,36936) 15/01/09 13:31:23 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Registered BlockManager 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/HeartbeatReceiver 15/01/09 13:31:54 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:57671] - [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] disassociated! Shutting down.
Newbie Question on How Tasks are Executed
I am writing my first project in spark. It is the implementation of the Space Saving counting algorithm. I am trying to understand how tasks are executed in partitions. As you can see from my code the algorithms keeps in memory only a small amount of words for example 100. The top-k ones not all of them. For each new word it arrives, it replaces the least frequent one and updates the counts and errors. If the word exists it increments its count. My question is where and how this code will be executed. For example i set the rod's parallelism to 4. So each task will be split in 4 partitions right? When i collect the words shouldn't i be returned 4*k words, since i have 4 partitions with k words kept in each one (k words from each partition)? If true why am i getting only k words as result when i collect them?? Where is the merging happening?? Is there a way to know which partitions returns what?? Is there a partitionid ??? How can i monitor each partitions execution is that possible to know for example what words get executed and were (i mean the partition) My code: === import org.apache.spark.{SparkConf, SparkContext} /** * Created by mixtou on 30/12/14. */ object SpaceSaving { var frequent_words_counters = scala.collection.immutable.Map[String, Array[Int]](); var guaranteed_words = scala.collection.immutable.Map[String, Array[Int]](); val top_k: Int = 100; var words_no: Int = 0; var tStart: Long = 0; var tStop: Long = 0; var fi: Double = 0.01; def main (args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(Space Saving Project).setMaster(local); val ctx = new SparkContext(sparkConf); val lines = ctx.textFile(/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt, 4) .map(line = line.toLowerCase()); val nonEmptyLines = lines.filter(line = line.nonEmpty); val regex = [,.:;'\\\?\\-!\\(\\)\\+\\[\\]\\d+].r; val cleanLines = nonEmptyLines.map(line = regex.replaceAllIn(line, )); val dirtyWords = cleanLines.flatMap(line = line.split(\\s+)); val words = dirtyWords.filter(word = word.length 3); words.foreach(word = space_saving_algorithm(word)); if (frequent_words_counters.size 0) { frequent_words_counters.foreach(line = println(Top Frequent Word: + line._1 + with count: + line._2(0) + end error: + line._2(1))); } System.out.println(=== Throughput:= + 1000*(words_no/(tStop - tStart))+ words per second. ); estimateGuaranteedFrequentWords(); ctx.stop(); } def space_saving_algorithm(word: String) = { if (frequent_words_counters.contains(word)) { val count = frequent_words_counters.get(word).get(0); val error = frequent_words_counters.get(word).get(1); frequent_words_counters += word - Array[Int](count + 1, error); } else { if (frequent_words_counters.size top_k) { frequent_words_counters += word - Array[Int](1, 0); } else { replaceLeastEntry(word); } } if (words_no 0) { tStop = java.lang.System.currentTimeMillis(); } else { tStart = java.lang.System.currentTimeMillis(); } words_no += 1; } def replaceLeastEntry(word: String): Unit = { var temp_list = frequent_words_counters.toList.sortWith( (x,y) = x._2(0) y._2(0) ); val word_count = temp_list.last._2(0); temp_list = temp_list.take(temp_list.length - 1); frequent_words_counters = temp_list.toMap[String, Array[Int]]; frequent_words_counters += word - Array[Int](word_count+1, word_count); } def estimateGuaranteedFrequentWords(): Unit = { frequent_words_counters.foreach{tuple = if (tuple._2(0) - tuple._2(1) words_no*fi) { guaranteed_words -= tuple._1; } else { System.out.println(Guaranteed Word : +tuple._1+ with count: +tuple._2(0)+ and error: +tuple._2(1)); } } } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accidental kill in UI
So I had a Spark job with various failures, and I decided to kill it and start again. I clicked the 'kill' link in the web console, restarted the job on the command line and headed back to the web console and refreshed to see how my job was doing... the URL at the time was: /stages/stage/kill?id=1terminate=true Which of course terminated the stage again. No loss, but if I'd waited a few hours before doing that, I would have lost data. I know to be careful next time, but isn't 'don't modify state as a result of a GET request' the first rule of HTTP? It could lead to an expensive mistake. Making this a POST would be a simple fix. Does anyone else think this is worth creating an issue for?
Re: Accidental kill in UI
(FWIW yes I think this should certainly be a POST. The link can become a miniature form to achieve this and then the endpoint just needs to accept POST only. You should propose a pull request.) On Fri, Jan 9, 2015 at 12:51 PM, Joe Wass jw...@crossref.org wrote: So I had a Spark job with various failures, and I decided to kill it and start again. I clicked the 'kill' link in the web console, restarted the job on the command line and headed back to the web console and refreshed to see how my job was doing... the URL at the time was: /stages/stage/kill?id=1terminate=true Which of course terminated the stage again. No loss, but if I'd waited a few hours before doing that, I would have lost data. I know to be careful next time, but isn't 'don't modify state as a result of a GET request' the first rule of HTTP? It could lead to an expensive mistake. Making this a POST would be a simple fix. Does anyone else think this is worth creating an issue for? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TF-IDF from spark-1.1.0 not working on cluster mode
Please find the attached worker log. I could see stream closed exception On Wed, Jan 7, 2015 at 10:51 AM, Xiangrui Meng men...@gmail.com wrote: Could you attach the executor log? That may help identify the root cause. -Xiangrui On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in local mode and not on distributed mode. Null pointer exception has been thrown. Is this a bug in spark-1.1.0 ? Following is the code: def main(args:Array[String]) { val conf=new SparkConf val sc=new SparkContext(conf) val documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split( ).toSeq) val hashingTF = new HashingTF() val tf= hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) val rdd=tfidf.map { vec = println(vector is+vec) (10) } rdd.saveAsTextFile(/home/padma/usecase) } Exception thrown: 15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp:// sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167 ] with ID 0 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM 15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection from [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888] 15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130] 15/01/06 12:36:12 INFO network.SendingConnection: Connected to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1 GB) 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB) 15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Thanks, Padma Ch spark-rtauser-org.apache.spark.deploy.worker.Worker-1-IMPETUS-DSRV02.out Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet compression codecs not applied
Hello, I tried to save a table created via the hive context as a parquet file but whatever compression codec (uncompressed, snappy, gzip or lzo) I set via setConf like: setConf(spark.sql.parquet.compression.codec, gzip) the size of the generated files is the always the same, so it seems like spark context ignores the compression codec that I set. Here is a code sample applied via the spark shell: import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql(SET hive.exec.dynamic.partition = true) hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required to make data compatible with impala hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) hiveContext.sql(create external table if not exists foo (bar STRING, ts INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET Location 'hdfs://path/data/foo') hiveContext.sql(insert into table foo partition(year, month,day) select *, year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, day(from_unixtime(ts)) as day from raw_foo) I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 and I also tried that with Impala on the same cluster which applied correctly the compression codecs. Does anyone know what could be the problem ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
We are able to resolve *SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up* as well. Spark-jobserver working fine now and need to experiment more. Thank you guys. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p21060.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Did anyone tried overcommit of CPU cores?
Hi, As you said, the --executor-cores will define the max number of tasks that an executor can take simultaneously. So, if you claim 10 cores, it is not possible to launch more than 10 tasks in an executor at the same time. According to my experience, set cores more than physical CPU core will cause overload of CPU at some point of execution of spark application. especially when you are using algorithm in mllib package. In addition, the executor-cores will affect the default level of parallelism of spark. Therefore, I recommend you to set cores = physical cores by default. Moreover, I don't think overcommit cpu will increase the use of CPU. In my opinion, it just increase the waiting queue of CPU. If you observe the CPU load is very low (through ganglia for example) and too much IO, maybe increasing level of parallelism or serializing your object is a good choice. Hoping this helps Cheers Gen On Fri, Jan 9, 2015 at 10:12 AM, Xuelin Cao xuelincao2...@gmail.com wrote: Thanks, but, how to increase the tasks per core? For example, if the application claims 10 cores, is it possible to launch 100 tasks concurrently? On Fri, Jan 9, 2015 at 2:57 PM, Jörn Franke jornfra...@gmail.com wrote: Hallo, Based on experiences with other software in virtualized environments I cannot really recommend this. However, I am not sure how Spark reacts. You may face unpredictable task failures depending on utilization, tasks connecting to external systems (databases etc.) may fail unexpectedly and this might be a problem for them (transactions not finishing etc.). Why not increase the tasks per core? Best regards Le 9 janv. 2015 06:46, Xuelin Cao xuelincao2...@gmail.com a écrit : Hi, I'm wondering whether it is a good idea to overcommit CPU cores on the spark cluster. For example, in our testing cluster, each worker machine has 24 physical CPU cores. However, we are allowed to set the CPU core number to 48 or more in the spark configuration file. As a result, we are allowed to launch more tasks than the number of physical CPU cores. The motivation of overcommit CPU cores is, for many times, a task cannot consume 100% resource of a single CPU core (due to I/O, shuffle, etc.). So, overcommit the CPU cores allows more tasks running at the same time, and makes the resource be used economically. But, is there any reason that we should not doing like this? Anyone tried this? [image: Inline image 1]
Zipping RDDs of equal size not possible
Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas
Re: Queue independent jobs
Awesome, it actually seems to work. Amazing how simple it can be sometimes... Thanks Sean! On Fri, Jan 9, 2015 at 12:42 PM, Sean Owen so...@cloudera.com wrote: You can parallelize on the driver side. The way to do it is almost exactly what you have here, where you're iterating over a local Scala collection of dates and invoking a Spark operation for each. Simply write dateList.par.map(...) to make the local map proceed in parallel. It should invoke the Spark jobs simultaneously. On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote: Hey, Lets say we have multiple independent jobs that each transform some data and store in distinct hdfs locations, is there a nice way to run them in parallel? See the following pseudo code snippet: dateList.map(date = sc.hdfsFile(date).map(transform).saveAsHadoopFile(date)) It's unfortunate if they run in sequence, since all the executors are not used efficiently. What's the best way to parallelize execution of these jobs? Thanks, Anders
Re: PermGen issues on AWS
It's normal for PermGen to be a bit more of an issue with Spark than for other JVM-based applications. You should simply increase the PermGen size, which I don't see in your command. -XX:MaxPermSize=256m allows it to grow to 256m for example. The right size depends on your total heap size and app. Also, Java 8 no longer has a permanent generation, so this particular type of problem and tuning is not needed. You might consider running on Java 8. On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass jw...@crossref.org wrote: I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW I'm using the Flambo Clojure wrapper which uses the Java API but I don't think that should make any difference. I'm running with the following command: spark/bin/spark-submit --class mything.core --name My Thing --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar For one of the stages I'm getting errors: - ExecutorLostFailure (executor lost) - Resubmitted (resubmitted due to lost executor) And I think they're caused by slave executor JVMs dying up with this error: java.lang.OutOfMemoryError: PermGen space java.lang.Class.getDeclaredConstructors0(Native Method) java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) java.lang.Class.getConstructor0(Class.java:2885) java.lang.Class.newInstance(Class.java:350) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396) java.security.AccessController.doPrivileged(Native Method) sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395) sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113) sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331) java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376) java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded / 1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted (resubmitted due to lost executor). Now my Aggregated Metrics by Executor shows that 10 out of 16 executors show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't been restarted. Now the 'Executors' tab shows only 7 executors. - Is this normal? - Any ideas why this is happening? - Any other measures I can take to prevent this? - Is the rest of my app going to run on a reduced number of executors? - Can I re-start the executors mid-application? This is a long-running job, so I'd like to do what I can whilst it's running, if possible. - Am I correct in thinking that the --conf arguments are supplied to the JVMs of the slave executors, so they will be receiving the extraJavaOptions and memoryOverhead? Thanks very much! Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: calculating the mean of SparseVector RDD
thanks for the suggestion -- however, looks like this is even slower. With the small data set I'm using, my aggregate function takes ~ 9 seconds and the colStats.mean() takes ~ 1 minute. However, I can't get it to run with the Kyro serializer -- I get the error: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5, required: 8 is there an easy/obvious fix? On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote: There is some serialization overhead. You can try https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107 . -Xiangrui On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote: I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PermGen issues on AWS
I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW I'm using the Flambo Clojure wrapper which uses the Java API but I don't think that should make any difference. I'm running with the following command: spark/bin/spark-submit --class mything.core --name My Thing --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar For one of the stages I'm getting errors: - ExecutorLostFailure (executor lost) - Resubmitted (resubmitted due to lost executor) And I think they're caused by slave executor JVMs dying up with this error: java.lang.OutOfMemoryError: PermGen space java.lang.Class.getDeclaredConstructors0(Native Method) java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) java.lang.Class.getConstructor0(Class.java:2885) java.lang.Class.newInstance(Class.java:350) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396) java.security.AccessController.doPrivileged(Native Method) sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395) sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113) sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331) java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376) java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded / 1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted (resubmitted due to lost executor). Now my Aggregated Metrics by Executor shows that 10 out of 16 executors show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't been restarted. Now the 'Executors' tab shows only 7 executors. - Is this normal? - Any ideas why this is happening? - Any other measures I can take to prevent this? - Is the rest of my app going to run on a reduced number of executors? - Can I re-start the executors mid-application? This is a long-running job, so I'd like to do what I can whilst it's running, if possible. - Am I correct in thinking that the --conf arguments are supplied to the JVMs of the slave executors, so they will be receiving the extraJavaOptions and memoryOverhead? Thanks very much! Joe
Queue independent jobs
Hey, Lets say we have multiple independent jobs that each transform some data and store in distinct hdfs locations, is there a nice way to run them in parallel? See the following pseudo code snippet: dateList.map(date = sc.hdfsFile(date).map(transform).saveAsHadoopFile(date)) It's unfortunate if they run in sequence, since all the executors are not used efficiently. What's the best way to parallelize execution of these jobs? Thanks, Anders
Re: Data locality running Spark on Mesos
Hi Tim, Thanks for your response. The benchmark I used just reads data in from HDFS and builds the Linear Regression model using methods from the MLlib. Unfortunately, for various reasons, I can't open the source code for the benchmark at this time. I will try to replicate the problem using some sample benchmarks provided by the vanilla Spark distribution. It is very possible that I have something very screwy in my workload or setup. The parameters I used for the Spark on Mesos are the following: driver memory = 1G total-executor-cores = 60 spark.executor.memory 6g spark.storage.memoryFraction 0.9 spark.mesos.coarse = true The rest are default values, so spark.locality.wait should just be 3000ms. I launched the Spark job on a separate node from the 10-node cluster using spark-submit. With regards to Mesos in fine-grained mode, do you have a feel for the overhead of launching executors for every task? Of course, any perceived slow down will probably be very dependent on the workload. I just want to have a feel of the possible overhead (e.g., factor of 2 or 3 slowdown?). If not a data locality issue, perhaps this overhead can be a factor in the slowdown I observed, at least in the fine-grained case. BTW: i'm using Spark ver 1.1.0 and Mesos ver 0.20.0 Thanks, Mike From: Tim Chen t...@mesosphere.io To: Michael V Le/Watson/IBM@IBMUS Cc: user user@spark.apache.org Date: 01/08/2015 03:04 PM Subject:Re: Data locality running Spark on Mesos How did you run this benchmark, and is there a open version I can try it with? And what is your configurations, like spark.locality.wait, etc? Tim On Thu, Jan 8, 2015 at 11:44 AM, mvle m...@us.ibm.com wrote: Hi, I've noticed running Spark apps on Mesos is significantly slower compared to stand-alone or Spark on YARN. I don't think it should be the case, so I am posting the problem here in case someone has some explanation or can point me to some configuration options i've missed. I'm running the LinearRegression benchmark with a dataset of 48.8GB. On a 10-node stand-alone Spark cluster (each node 4-core, 8GB of RAM), I can finish the workload in about 5min (I don't remember exactly). The data is loaded into HDFS spanning the same 10-node cluster. There are 6 worker instances per node. However, when running the same workload on the same cluster but now with Spark on Mesos (course-grained mode), the execution time is somewhere around 15min. Actually, I tried with find-grained mode and giving each Mesos node 6 VCPUs (to hopefully get 6 executors like the stand-alone test), I still get roughly 15min. I've noticed that when Spark is running on Mesos, almost all tasks execute with locality NODE_LOCAL (even in Mesos in coarse-grained mode). On stand-alone, the locality is mostly PROCESS_LOCAL. I think this locality issue might be the reason for the slow down but I can't figure out why, especially for coarse-grained mode as the executors supposedly do not go away until job completion. Any ideas? Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-running-Spark-on-Mesos-tp21041.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
Hey Nathan, Thanks for sharing, this is a very interesting post :) My comments are inlined below. Cheng On 1/7/15 11:53 AM, Nathan McCarthy wrote: Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) I believe the evil thing that makes this snippet much slower is the for-loop. According to my early benchmark done with Scala 2.9, for-loop can be orders of magnitude slower than a simple while-loop, especially when the body of the loop only does something as trivial as this case. The reason is that Scala for-loop is translated into corresponding foreach/map/flatMap/withFilter function calls. And that's exactly why Spark SQL tries to avoid for-loop or any other functional style code in critical paths (where every row is touched), we also uses reusable mutable row objects instead of the immutable version to improve performance. You may check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, which also use imperative while-loop and reusable mutable rows. Another thing to notice is that the `hrs` iterator physically points to underlying in-memory columnar byte buffers, and the `for (r - hrs) { ... }` loop actually decompresses and extracts values from required byte buffers (this is the unwrapping processes you mentioned below). Now this takes around ~49 seconds… Even though test1 table is 100% cached. The number of partitions remains the same… Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, sales: Double) Convert the SchemaRDD; val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache() //cache all the data rdd.count() Then run basically the same MapPartitions query; rdd.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.hour qtySum(hr) += r.qty salesSum(hr) += r.sales } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) This takes around 1.5 seconds! Albeit the memory footprint is much larger. I guess this 1.5 seconds doesn't include the time spent on caching the simple RDD? As I've explained above, in the first `mapPartitions` style snippet, columnar byte buffer unwrapping happens within the `mapPartitions` call. However, in this version, the unwrapping process happens when the `rdd.count()` action is performed. At that point, all values of all columns are extracted from underlying byte buffers, and the portion of data you need are then manually selected and transformed into the simple case class RDD via the `map` call. If you include time spent on caching the simple case class RDD, it should be even slower than the first `mapPartitions` version. My thinking is that because SparkSQL does store things in a columnar format, there is some unwrapping to be done out of the column array buffers which takes time and for some reason this just takes longer when I switch out to map partitions (maybe its unwrapping the entire row, even though I’m using just a subset of columns, or maybe there is some object creation/autoboxing going on when calling getInt or getDouble)… I’ve tried simpler cases too, like just summing sales. Running sum via SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster (2.6 seconds). But MapPartitions on the SchemaRDD; /sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum/ takes a long time (33 seconds). In all these examples everything is fully cached in memory. And yes for these kinds
Cleaning up spark.local.dir automatically
Hi, Is there a way of automatically cleaning up the spark.local.dir after a job has been run? I have noticed a large number of temporary files have been stored here and are not cleaned up. The only solution I can think of is to run some sort of cron job to delete files older than a few days. I am currently using a mixture of standalone and YARN spark builds. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm