spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Trouble with large Yarn job
Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDatein code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey(_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Re: How to merge a RDD of RDDs into one uber RDD
You can also use join function of rdd. This is actually kind of append funtion that add up all the rdds and create one uber rdd. On Wed, Jan 7, 2015, 14:30 rkgurram rkgur...@gmail.com wrote: Thank you for the response, sure will try that out. Currently I changed my code such that the first map files.map to files.flatMap, which I guess will do similar what you are saying, it gives me a List[] of elements (in this case LabeledPoints, I could also do RDDs) which I then turned into a mega RDD. The current problem seems to be gone, I no longer get the NPE but further down I am getting a indexOutOfBounds, so trying to figure out if the original problem is manifesting itself as a new one. Regards -Ravi -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one- uber-RDD-tp20986p21012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to do incremental training using ALSModel (MLlib)?
As I recall Oryx (the old version, and I assume the new one too) provide something like this: http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int- though Sean will be more on top of that than me :) On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be wrote: One other idea was that I don’t need to re-train the model, but simply pass all the current user’s recent ratings (including one’s created after the training) to the existing model… Is this a valid option? Wouter Samaey Zaakvoerder Storefront BVBA Tel: +32 472 72 83 07 Web: http://storefront.be LinkedIn: http://www.linkedin.com/in/woutersamaey On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com wrote: In the first instance, I'm suggesting that ALS in Spark could perhaps expose a run() method that accepts a previous MatrixFactorizationModel, and uses the product factors from it as the initial state instead. If anybody seconds that idea, I'll make a PR. The second idea is just fold-in: http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 Whether you do this or something like SGD, inside or outside Spark, depends on your requirements I think. On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey wouter.sam...@storefront.be wrote: Do you know a place where I could find a sample or tutorial for this? I'm still very new at this. And struggling a bit... Thanks in advance Wouter Sent from my iPhone. On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote: Yes, it is easy to simply start a new factorization from the current model solution. It works well. That's more like incremental *batch* rebuilding of the model. That is not in MLlib but fairly trivial to add. You can certainly 'fold in' new data to approximately update with one new datum too, which you can find online. This is not quite the same idea as streaming SGD. I'm not sure this fits the RDD model well since it entails updating one element at a time but mini batch could be reasonable. On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote: I was under the impression that ALS wasn't designed for it :- The famous ebay online recommender uses SGD However, you can try using the previous model as starting point, and gradually reduce the number of iteration after the model stablize. I never verify this idea, so you need to at least cross-validate it before putting into productio On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be wrote: Hi all, I'm curious about MLlib and if it is possible to do incremental training on the ALSModel. Usually training is run first, and then you can query. But in my case, data is collected in real-time and I want the predictions of my ALSModel to consider the latest data without complete re-training phase. I've checked out these resources, but could not find any info on how to solve this: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html My question fits in a larger picture where I'm using Prediction IO, and this in turn is based on Spark. Thanks in advance for any advice! Wouter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to do incremental training using ALSModel (MLlib)?
You’re right, Nick! This function does exactly that. Sean has already helped me greatly. Thanks for your reply. Wouter Samaey Zaakvoerder Storefront BVBA Tel: +32 472 72 83 07 Web: http://storefront.be LinkedIn: http://www.linkedin.com/in/woutersamaey On 07 Jan 2015, at 11:08, Nick Pentreath nick.pentre...@gmail.com wrote: As I recall Oryx (the old version, and I assume the new one too) provide something like this: http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int- http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int- though Sean will be more on top of that than me :) On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be mailto:wouter.sam...@storefront.be wrote: One other idea was that I don’t need to re-train the model, but simply pass all the current user’s recent ratings (including one’s created after the training) to the existing model… Is this a valid option? Wouter Samaey Zaakvoerder Storefront BVBA Tel: +32 472 72 83 07 tel:%2B32%20472%2072%2083%2007 Web: http://storefront.be http://storefront.be/ LinkedIn: http://www.linkedin.com/in/woutersamaey http://www.linkedin.com/in/woutersamaey On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com mailto:so...@cloudera.com wrote: In the first instance, I'm suggesting that ALS in Spark could perhaps expose a run() method that accepts a previous MatrixFactorizationModel, and uses the product factors from it as the initial state instead. If anybody seconds that idea, I'll make a PR. The second idea is just fold-in: http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 Whether you do this or something like SGD, inside or outside Spark, depends on your requirements I think. On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey wouter.sam...@storefront.be mailto:wouter.sam...@storefront.be wrote: Do you know a place where I could find a sample or tutorial for this? I'm still very new at this. And struggling a bit... Thanks in advance Wouter Sent from my iPhone. On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com mailto:so...@cloudera.com wrote: Yes, it is easy to simply start a new factorization from the current model solution. It works well. That's more like incremental *batch* rebuilding of the model. That is not in MLlib but fairly trivial to add. You can certainly 'fold in' new data to approximately update with one new datum too, which you can find online. This is not quite the same idea as streaming SGD. I'm not sure this fits the RDD model well since it entails updating one element at a time but mini batch could be reasonable. On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com mailto:rhw...@gmail.com wrote: I was under the impression that ALS wasn't designed for it :- The famous ebay online recommender uses SGD However, you can try using the previous model as starting point, and gradually reduce the number of iteration after the model stablize. I never verify this idea, so you need to at least cross-validate it before putting into productio On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be mailto:wouter.sam...@storefront.be wrote: Hi all, I'm curious about MLlib and if it is possible to do incremental training on the ALSModel. Usually training is run first, and then you can query. But in my case, data is collected in real-time and I want the predictions of my ALSModel to consider the latest data without complete re-training phase. I've checked out these resources, but could not find any info on how to solve this: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html My question fits in a larger picture where I'm using Prediction IO, and this in turn is based on Spark. Thanks in advance for any advice! Wouter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
Re: Parquet schema changes
Fantastic - glad to see that it's in the pipeline! On Wed, Jan 7, 2015 at 11:27 AM, Michael Armbrust mich...@databricks.com wrote: I want to support this but we don't yet. Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-3851 On Tue, Jan 6, 2015 at 5:23 PM, Adam Gilmore dragoncu...@gmail.com wrote: Anyone got any further thoughts on this? I saw the _metadata file seems to store the schema of every single part (i.e. file) in the parquet directory, so in theory it should be possible. Effectively, our use case is that we have a stack of JSON that we receive and we want to encode to Parquet for high performance, but there is potential of new fields being added to the JSON structure, so we want to be able to handle that every time we encode to Parquet (we'll be doing it incrementally for performance). On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore dragoncu...@gmail.com wrote: I saw that in the source, which is why I was wondering. I was mainly reading: http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/ A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2. I know that each part file can have its own schema, but I saw in the implementation for Spark, if there was no metadata file, it'd just pick the first file and use that schema across the board. I'm not quite sure how other implementations like Impala etc. deal with this, but I was really hoping there'd be a way to version the schema as new records are added and just project it through. Would be a godsend for semi-structured data. On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian lian.cs@gmail.com wrote: I must missed something important here, could you please provide more clue on Parquet “schema versioning”? I wasn’t aware of this feature (which sounds really useful). Especially, are you referring the following scenario: 1. Write some data whose schema is A to “t.parquet”, resulting a file “t.parquet/parquet-r-1.part” on HDFS 2. Append more data whose schema B “contains” A, but has more columns to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part” on HDFS 3. Now read “t.parquet”, and schema A and B are expected to be merged If this is the case, then current Spark SQL doesn’t support this. We assume schemas of all data within a single Parquet file (which is an HDFS directory with multiple part-files) are identical. On 12/22/14 1:11 PM, Adam Gilmore wrote: Hi all, I understand that parquet allows for schema versioning automatically in the format; however, I'm not sure whether Spark supports this. I'm saving a SchemaRDD to a parquet file, registering it as a table, then doing an insertInto with a SchemaRDD with an extra column. The second SchemaRDD does in fact get inserted, but the extra column isn't present when I try to query it with Spark SQL. Is there anything I can do to get this working how I'm hoping?
Re: How to merge a RDD of RDDs into one uber RDD
Thank you for the response, sure will try that out. Currently I changed my code such that the first map files.map to files.flatMap, which I guess will do similar what you are saying, it gives me a List[] of elements (in this case LabeledPoints, I could also do RDDs) which I then turned into a mega RDD. The current problem seems to be gone, I no longer get the NPE but further down I am getting a indexOutOfBounds, so trying to figure out if the original problem is manifesting itself as a new one. Regards -Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
example insert statement in Spark SQL
Hi, Are insert statements supported in Spark? if so, can you please give me an example? Rgds -- Niranda
Spark SQL: The cached columnar table is not columnar?
Hi, Curious and curious. I'm puzzled by the Spark SQL cached table. Theoretically, the cached table should be columnar table, and only scan the column that included in my SQL. However, in my test, I always see the whole table is scanned even though I only select one column in my SQL. Here is my code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable) sqlContext.cacheTable(adTable) //The table has 10 columns //First run, cache the table into memory sqlContext.sql(select * from adTable).collect //Second run, only one column is used. It should only scan a small fraction of data sqlContext.sql(select adId from adTable).collect sqlContext.sql(select adId from adTable).collect sqlContext.sql(select adId from adTable).collect What I found is, every time I run the SQL, in WEB UI, it shows the total amount of input data is always the same --- the total amount of the table. Is anything wrong? My expectation is: 1. The cached table is stored as columnar table 2. Since I only need one column in my SQL, the total amount of input data showed in WEB UI should be very small But what I found is totally not the case. Why? Thanks
spark-network-yarn 2.11 depends on spark-network-shuffle 2.10
It seems that spark-network-yarn compiled for scala 2.11 depends on spark-network-shuffle compiled for scala 2.10. This causes cross version dependencies conflicts in sbt. Seems like a publishing error? http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR
Re: disable log4j for spark-shell
Edit your conf/log4j.properties file and Change the following line: log4j.rootCategory=INFO, console to log4j.rootCategory=ERROR, console Another approach would be to : Fireup spark-shell and type in the following: import org.apache.log4j.Logger import org.apache.log4j.Level Logger.getLogger(org).setLevel(Level.OFF) Logger.getLogger(akka).setLevel(Level.OFF) You won't see any logs after that. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p21010.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Data Using TextFileStream
How about the following code? I'm not quiet sure what you were doing inside the flatmap and foreach. import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class Test1 { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount); JavaStreamingContext ssc = new JavaStreamingContext(local[4],JavaWordCount, new Duration(2)); JavaDStreamString textStream = ssc.textFileStream(user/ huser/user/huser/flume);//Data Directory Path in HDFS textStream.print(); System.out.println(Welcome TO Flume Streaming); ssc.start(); ssc.awaitTermination(); } } Thanks Best Regards On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi Akhil, I had missed the forward slash in the directory part. After correcting the directory path ,Now Iam facing with the below mentioned error. Can anyone help me with this issue. 15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took 360 ms 15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time 142064792 ms: 15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time 142064792 ms 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job 142064792 ms.0 from job set of time 142064792 ms --- Time: 142064792 ms --- 15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job 142064792 ms.0 from job set of time 142064792 ms 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job 142064792 ms.1 from job set of time 142064792 ms 15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 142064792 ms.1 java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD.first(RDD.scala:1094) at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433) at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32) at xyz.Test1$2.call(Test1.java:67) at xyz.Test1$2.call(Test1.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Regards, Jeniba Johnson *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Wednesday, January 07, 2015 12:11 PM *To:* Jeniba Johnson *Cc:* Hari Shreedharan (hshreedha...@cloudera.com); d...@spark.apache.org *Subject:* Re: Reading Data Using TextFileStream I think you need to start your streaming job, then put the files there to get them read. textFileStream doesn't read the existing files i believe. Also are you sure the path is not the following? (no missing / in the beginning?) JavaDStreamString textStream = ssc.textFileStream(*/*user/ huser/user/huser/flume); Thanks Best Regards On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi Hari, Iam trying to read data from a file which is stored in HDFS. Using Flume the data is tailed and stored in HDFS. Now I want to read this data using TextFileStream. Using the below mentioned code Iam not able to fetch the Data from a file which is stored in HDFS. Can anyone help me with this issue. import org.apache.spark.SparkConf; import
Re: How to merge a RDD of RDDs into one uber RDD
I think you mean union(). Yes, you could also simply make an RDD for each file, and use SparkContext.union() to put them together. On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You can also use join function of rdd. This is actually kind of append funtion that add up all the rdds and create one uber rdd. On Wed, Jan 7, 2015, 14:30 rkgurram rkgur...@gmail.com wrote: Thank you for the response, sure will try that out. Currently I changed my code such that the first map files.map to files.flatMap, which I guess will do similar what you are saying, it gives me a List[] of elements (in this case LabeledPoints, I could also do RDDs) which I then turned into a mega RDD. The current problem seems to be gone, I no longer get the NPE but further down I am getting a indexOutOfBounds, so trying to figure out if the original problem is manifesting itself as a new one. Regards -Ravi -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one- uber-RDD-tp20986p21012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks, Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: [MLLib] storageLevel in ALS
1.2 I run() you have usersOut.setName(usersOut).persist(StorageLevel.MEMORY_AND_DISK) productsOut.setName(productsOut).persist(StorageLevel.MEMORY_AND_DISK) On Wed, Jan 7, 2015, 02:11 Xiangrui Meng men...@gmail.com wrote: Which Spark version are you using? We made this configurable in 1.1: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202 -Xiangrui On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote: Hi, I was doing a tests with ALS and I noticed that if I persist the inner RDDs from a MatrixFactorizationModel the RDD is not replicated, it seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it makes sense to make that configurable? [image: Inline image 1]
[GraphX] Integration with TinkerPop3/Gremlin
Hi Spark/GraphX community, I'm wondering if you have TinkerPop3/Gremlin on your radar? (github https://github.com/tinkerpop/tinkerpop3, doc http://www.tinkerpop.com/docs/3.0.0-SNAPSHOT) They've done an amazing work refactoring their stack recently and Gremlin is a very nice DSL to work with graphs. They even have a scala client https://github.com/mpollmeier/gremlin-scala. So far, they've used Hadoop for MapReduce tasks and I think GraphX could nicely dig in. Any view? Thanks, Nicolas
max receiving rate in spark streaming
Hi experts! Is there any way to decide what can be effective receiving rate for kafka spark streaming? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/max-receiving-rate-in-spark-streaming-tp21013.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TestSuiteBase based unit test using a sliding window join timesout
Hi, I extended org.apache.spark.streaming.TestSuiteBase for some testing, and I was able to run this test fine: test(Sliding window join with 3 second window duration) { val input1 = Seq( Seq(req1), Seq(req2, req3), Seq(), Seq(req4, req5, req6), Seq(req7), Seq(), Seq() ) val input2 = Seq( Seq((tx1, req1)), Seq(), Seq((tx2, req3)), Seq((tx3, req2)), Seq(), Seq((tx4, req7)), Seq((tx5, req5), (tx6, req4)) ) val expectedOutput = Seq( Seq((req1, (1, tx1))), Seq(), Seq((req3, (1, tx2))), Seq((req2, (1, tx3))), Seq(), Seq((req7, (1, tx4))), Seq() ) val operation = (rq: DStream[String], tx: DStream[(String,String)]) = { rq.window(Seconds(3), Seconds(1)).map(x = (x, 1)).join(tx.map{ case (k, v) = (v, k)}) } testOperation(input1, input2, operation, expectedOutput, useSet=true) } However, this seemingly OK looking test fails with operation timeout: test(Sliding window join with 3 second window duration + a tumbling window) { val input1 = Seq( Seq(req1), Seq(req2, req3), Seq(), Seq(req4, req5, req6), Seq(req7), Seq() ) val input2 = Seq( Seq((tx1, req1)), Seq(), Seq((tx2, req3)), Seq((tx3, req2)), Seq(), Seq((tx4, req7)) ) val expectedOutput = Seq( Seq((req1, (1, tx1))), Seq((req2, (1, tx3)), (req3, (1, tx3))), Seq((req7, (1, tx4))) ) val operation = (rq: DStream[String], tx: DStream[(String,String)]) = { rq.window(Seconds(3), Seconds(2)).map(x = (x, 1)).join(tx.window(Seconds(2), Seconds(2)).map{ case (k, v) = (v, k)}) } testOperation(input1, input2, operation, expectedOutput, useSet=true) } Stacktrace: 10033 was not less than 1 Operation timed out after 10033 ms org.scalatest.exceptions.TestFailedException: 10033 was not less than 1 Operation timed out after 10033 ms at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466) at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:338) Does anybody know why this could be? ᐧ
Re: max receiving rate in spark streaming
If you are using the Lowlevel consumer https://github.com/dibbhatt/kafka-spark-consumer then you have an option to tweak the rate by setting *_fetchSizeBytes https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/KafkaConfig.java#L37 *value. Default is 64kb, you can increase it upto 1MB+ depending on your cluster size. Thanks Best Regards On Wed, Jan 7, 2015 at 4:41 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! Is there any way to decide what can be effective receiving rate for kafka spark streaming? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/max-receiving-rate-in-spark-streaming-tp21013.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Data Using TextFileStream
You need to put some files in the location *(/user/huser/user/huser/flume)* once the job starts running, then only it will print. also note i missed the / in the above code. Thanks Best Regards On Wed, Jan 7, 2015 at 4:42 PM, Jeniba Johnson jeniba.john...@lntinfotech.com wrote: Hi Akhil, I had used flat map method, so that the lines from a file will be printed as soon as I tailed it from flume to HDFS. Using the below mentioned code, the lines from a file are not being printed. *Output* *Welcome TO Flume Streaming* *15/01/07 22:32:46 INFO dstream.ForEachDStream: metadataCleanupDelay = -1* *15/01/07 22:32:46 INFO dstream.MappedDStream: metadataCleanupDelay = -1* *15/01/07 22:32:46 INFO dstream.FileInputDStream: metadataCleanupDelay = -1* *15/01/07 22:32:46 INFO dstream.FileInputDStream: Slide time = 2 ms* *15/01/07 22:32:46 INFO dstream.FileInputDStream: Storage level = StorageLevel(false, false, false, false, 1)* *15/01/07 22:32:46 INFO dstream.FileInputDStream: Checkpoint interval = null* *15/01/07 22:32:46 INFO dstream.FileInputDStream: Remember duration = 2 ms* *15/01/07 22:32:46 INFO dstream.FileInputDStream: Initialized and validated org.apache.spark.streaming.dstream.FileInputDStream@6c8185d3* *15/01/07 22:32:46 INFO dstream.MappedDStream: Slide time = 2 ms* *15/01/07 22:32:46 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)* *15/01/07 22:32:46 INFO dstream.MappedDStream: Checkpoint interval = null* *15/01/07 22:32:46 INFO dstream.MappedDStream: Remember duration = 2 ms* *15/01/07 22:32:46 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2b79174c* *15/01/07 22:32:46 INFO dstream.ForEachDStream: Slide time = 2 ms* *15/01/07 22:32:46 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)* *15/01/07 22:32:46 INFO dstream.ForEachDStream: Checkpoint interval = null* *15/01/07 22:32:46 INFO dstream.ForEachDStream: Remember duration = 2 ms* *15/01/07 22:32:46 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1ae894e0* *15/01/07 22:32:46 INFO util.RecurringTimer: Started timer for JobGenerator at time 142065018* *15/01/07 22:32:46 INFO scheduler.JobGenerator: Started JobGenerator at 142065018 ms* *15/01/07 22:32:46 INFO scheduler.JobScheduler: Started JobScheduler* *15/01/07 22:33:00 INFO dstream.FileInputDStream: Finding new files took 347 ms* *15/01/07 22:33:00 INFO dstream.FileInputDStream: New files at time 142065018 ms:* *15/01/07 22:33:00 INFO scheduler.JobScheduler: Added jobs for time 142065018 ms* *15/01/07 22:33:00 INFO scheduler.JobScheduler: Starting job streaming job 142065018 ms.0 from job set of time 142065018 ms* *---* *Time: 142065018 ms* *---* *15/01/07 22:33:00 INFO scheduler.JobScheduler: Finished job streaming job 142065018 ms.0 from job set of time 142065018 ms* *15/01/07 22:33:00 INFO scheduler.JobScheduler: Total delay: 0.424 s for time 142065018 ms (execution: 0.017 s)* *15/01/07 22:33:00 INFO dstream.FileInputDStream: Cleared 0 old files that were older than 142065016 ms:* *15/01/07 22:33:20 INFO dstream.FileInputDStream: Finding new files took 9 ms* *15/01/07 22:33:20 INFO dstream.FileInputDStream: New files at time 142065020 ms:* *---* *15/01/07 22:33:20 INFO scheduler.JobScheduler: Starting job streaming job 142065020 ms.0 from job set of time 142065020 ms* *15/01/07 22:33:20 INFO scheduler.JobScheduler: Added jobs for time 142065020 ms* *Time: 142065020 ms* Regards, Jeniba Johnson *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Wednesday, January 07, 2015 4:17 PM *To:* Jeniba Johnson; user@spark.apache.org *Subject:* Re: Reading Data Using TextFileStream How about the following code? I'm not quiet sure what you were doing inside the flatmap and foreach. import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class Test1 { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount); JavaStreamingContext ssc = new JavaStreamingContext(local[4],JavaWordCount, new Duration(2)); JavaDStreamString
About logistic regression sample codes in pyspark
Hi all, Recently I played a little bit with both naive and mllib python sample codes for logistic regression. In short I wanted to compare naive and non naive logistic regression results using same input weights and data. So, I modified slightly both sample codes to use the same initial weights and generated a text file containing lines of label and features separated by spaces. After one iteration the computed weights are the same (nice !), but on the second iteration the computed weights are different (and obviously for the remaining iterations too) Maybe this behaviour is related to the default regularizer and regularization parameter used by the mllib implementation of LogisticRegressionWithSGD ? What is the difference between the naive implementation and the mllib implementation of logisticRegression with stochastic gradient descent ? Thanks Cedric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/About-logistic-regression-sample-codes-in-pyspark-tp21015.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
KafkaUtils not consuming all the data from all partitions
Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: KafkaUtils not consuming all the data from all partitions
Hi, Could you add the code where you create the Kafka consumer? -kr, Gerard. On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote: Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? — FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, Mukesh Jha me.mukesh@gmail.com
FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success
Hi, I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I run example Java jobs such as spark-pi, the following files get created: bash-4.1$ tree spark-pi-1420624364958 spark-pi-1420624364958 âââ APPLICATION_COMPLETE âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 3 files However, when I run my pyspark job, no APPLICATION_COMPLETE file gets created. bash-4.1$ tree pyspark-1420628130353 pyspark -1420628130353 âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 2 files If I touch the file into this directory, it just appears as not started in the history server UI. I am submitting jobs using spark-submit for now: bin/spark-submit --master yarn-client --executor-memory 4G --executor-cores 12 --num-executors 10 –queue highpriority path to python file Is there a setting I am missing for this APPLICATION_COMPLETE file to be created when a pyspark job completes? Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: KafkaUtils not consuming all the data from all partitions
Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? — FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: disable log4j for spark-shell
Another option is to make a copy of log4j.properties in the current directory where you start spark-shell from, and modify log4j.rootCategory=INFO, console to log4j.rootCategory=ERROR, console. Then start the shell. On Wed, Jan 7, 2015 at 3:39 AM, Akhil ak...@sigmoidanalytics.com wrote: Edit your conf/log4j.properties file and Change the following line: log4j.rootCategory=INFO, console to log4j.rootCategory=ERROR, console Another approach would be to : Fireup spark-shell and type in the following: import org.apache.log4j.Logger import org.apache.log4j.Level Logger.getLogger(org).setLevel(Level.OFF) Logger.getLogger(akka).setLevel(Level.OFF) You won't see any logs after that. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p21010.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
akka.actor.ActorNotFound In Spark Streaming on Mesos (using ssc.actorStream)
Hi all, I am trying to run this example on mesos: https://github.com/jaceklaskowski/spark-activator#master https://github.com/jaceklaskowski/spark-activator#master I have mesos 0.21.0 (instead of 0.18.1, could that be a problem?) I download spark pre built package spark-1.2.0-bin-hadoop2.4.tgz untar it Create the conf/spark-env.sh file with the following lines: export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=/home/christophe/Development/spark-1.1.1-bin-hadoop2.4.tgz I create and fill the build.sbt (spark 1.2.0 / scala 2.11.4) and I am using the src/main/scala/StreamingApp.scala (of the spark activator) as my main class in Spark When I submit with .setMaster(local[*]) The helloer actor is started at path=akka://sparkDriver/user/Supervisor0/helloer and it works fine. But when I submit with .setMaster(mesos://127.0.1.1:5050) The helloer actor is started at path=akka://sparkExecutor/user/Supervisor0/helloer and I get the following log: Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://sparkDriver/), Path(/user/Supervisor0/helloer)] The problem is probably the new path of my actor It can't be reached by the following url anymore (since its path is akka://sparkExecutor/user/Supervisor0/helloer) : val url = sakka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName I have tried many systemActor@host:port but I didn't manage to communicate with my actor How can I reach my actor? Can the mesos 0.21.0 be the source of my problem? Have I misconfigured anything? Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-actor-ActorNotFound-In-Spark-Streaming-on-Mesos-using-ssc-actorStream-tp21014.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Strange DAG scheduling behavior on currently dependent RDDs
We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework that we've been developing that connects various different RDDs together based on some predefined business cases. After updating to 1.2.0, some of the concurrency expectations about how the stages within jobs are executed have changed quite significantly. Given 3 RDDs: RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache() RDD2 = RDD1.outputToFile RDD3 = RDD1.groupBy().outputToFile In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and RDD3 to both block waiting for RDD1 to complete and cache- at which point RDD2 and RDD3 both use the cached version to complete their work. Spark 1.2.0 seems to schedule two (be it concurrently running) stages for each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each get run twice). It does not look like there is any sharing of the results between these jobs. Are we doing something wrong? Is there a setting that I'm not understanding somewhere?
Re: Why Parquet Predicate Pushdown doesn't work?
But Xuelin already posted in the original message that the code was using SET spark.sql.parquet.filterPushdown=true On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote: Quoting Michael: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration Daniel On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote: Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: *sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect* Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when *generating *the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Spark Trainings/ Professional certifications
Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Re: Understanding RDD.GroupBy OutOfMemory Exceptions
Hi Patrick: Do you know what the status of this issue is? Is there a JIRA that is tracking this issue? Thanks. Asim Patrick Wendell writes: Within a partition things will spill - so the current documentation is correct. This spilling can only occur *across keys* at the moment. Spilling cannot occur within a key at present. [...] Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. In most cases we see when users hit this, they are actually trying to just do aggregations which would be more efficiently implemented without the groupBy operator. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p21016.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create DStream consisting of HDFS and (then) Kafka data
Hi, On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote: dstream processing bulk HDFS data- is something I don't feel is super well socialized yet, fingers crossed that base gets built up a little more. Just out of interest (and hoping not to hijack my own thread), why are you not doing plain RDD processing when you are only processing HDFS data? What's the advantage of doing DStream? Thanks Tobias
Re: Why Parquet Predicate Pushdown doesn't work?
Yes, the problem is, I've turned the flag on. One possible reason for this is, the parquet file supports predicate pushdown by setting statistical min/max value of each column on parquet blocks. If in my test, the groupID=10113000 is scattered in all parquet blocks, then the predicate pushdown fails. But, I'm not quite sure about that. I don't know whether there is any other reason that can lead to this. On Wed, Jan 7, 2015 at 10:14 PM, Cody Koeninger c...@koeninger.org wrote: But Xuelin already posted in the original message that the code was using SET spark.sql.parquet.filterPushdown=true On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote: Quoting Michael: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration Daniel On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote: Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: *sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect* Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when *generating *the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Can spark supports task level resource management?
Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
Re: Can spark supports task level resource management?
Hi Xuelin, I can only speak about Mesos mode. There are two modes of management in Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode. In fine grain mode, each spark task launches one or more spark executors that only live through the life time of the task. So it's comparable to what you spoke about. In coarse grain mode it's going to support dynamic allocation of executors but that's being at a higher level than tasks. As for resource management recommendation, I think it's important to see what other applications you want to be running besides Spark in the same cluster and also your use cases, to see what resource management fits your need. Tim On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
I have not used CDH5.3.0. But looks spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar contains some hadoop1 jars (come from a wrong hbase version). I don't know the recommanded way to build spark-examples jar because the official Spark docs does not mention how to build spark-examples jar. For me, I will addd -Dhbase.profile=hadoop2 to the build instruction so that the examples project will use a haoop2-compatible hbase. Best Regards, Shixiong Zhu 2015-01-08 0:30 GMT+08:00 Antony Mayi antonym...@yahoo.com.invalid: thanks, I found the issue, I was including /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into the classpath - this was breaking it. now using custom jar with just the python convertors and all works as a charm. thanks, antony. On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com wrote: Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like you are inadvertently including Spark code compiled for Hadoop 1 when you run your app. The general idea is to use the cluster's copy at runtime. Those with more pyspark experience might be able to give more useful directions about how to fix that. On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote: this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx, Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks, Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Eclipse flags error on KafkaUtils.createStream()
Hi,I am using Eclipse writing Java code. I am trying to create a Kafka receiver by: JavaPairReceiverInputDStreamString, kafka.message.Message a = KafkaUtils.createStream(jssc, String.class, Message.class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); where jssc, kafkaParams, and topics are all properly defined. I am getting flagged by Eclipse with the following messages:The type scala.reflect.ClassTag cannot be resolved. It is indirectly referenced from required .class files I don't know Scala and it seems that scala.reflect.ClassTag is an unusual class which can not be imported simply by using an import statement. I haveimport scala.reflect.*;but it doesn't help. In my pom.xml I have: dependency groupIdorg.scala-lang/groupId artifactIdscala-reflect/artifactId version2.11.4/version/dependency That doesn't help either. Is Eclipse flagging a real problem? A solution suggested by Eclipse is to edit the Java Build Path using the UI below. However, I have no idea what to do. I would rather use the API below that doesn't require passing in of StringDecoder and DefaultDecoder below. However, the contents of my Kafka messages are not Strings. Is there any way to use this APIwith non-String Kafka messages? public static JavaPairReceiverInputDStreamString,String createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.MapString,Integer topics, StorageLevel storageLevel) Thanks!!KC - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: MatrixFactorizationModel serialization
Try loading features as Val userfeatures = sc.objectFile(path1) Val productFeatures = sc.objectFile(path2) And then call the constructor of the MatrixFsgtorizationModel with those. Sent with Good (www.good.com) -Original Message- From: wanbo [gewa...@163.commailto:gewa...@163.com] Sent: Wednesday, January 07, 2015 10:54 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: MatrixFactorizationModel serialization I save and reload model like this: val bestModel = ALS.train(training, rank, numIter, lambda) bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures) val bestModel = obj.asInstanceOf[MatrixFactorizationModel] bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures) But, there has same exception: Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NullPointerException at com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138) at com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala) ... 5 more Have fixed this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Create DStream consisting of HDFS and (then) Kafka data
Hi, I have a setup where data from an external stream is piped into Kafka and also written to HDFS periodically for long-term storage. Now I am trying to build an application that will first process the HDFS files and then switch to Kafka, continuing with the first item that was not yet in HDFS. (The items have an increasing timestamp that I can use to find the first item not yet processed.) I am wondering what an elegant method to provide a unified view of the data would be. Currently, I am using two StreamingContexts one after another: - start one StreamingContext A and process all data found in HDFS (updating the largest seen timestamp in an accumulator), stopping when there was an RDD with 0 items in it, - stop that StreamingContext A, - start a new StreamingContext B and process the Kafka stream (filtering out all items with a timestamp smaller than the value in the accumulator), - stop when the user requests it. This *works* as it is now, but I was planning to add sliding windows (based on item counts or the timestamps in the data), which will get unmanageably complicated when I have a window spanning data in both HDFS and Kafka, I guess. Therefore I would like to have a single DStream that is fed with first HDFS and then Kafka data. Does anyone have a suggestion on how to realize that (with as few copying of private[spark] classes as possible)? I guess one issue is that the Kafka processing requires a receiver and therefore needs to be treated quite a bit differently than HDFS? Thanks Tobias
Re: Create DStream consisting of HDFS and (then) Kafka data
I've started 1 or 2 emails to ask more broadly- what are good practices for doing DStream computations in a non-realtime fashion? I'd love to have a good intro article to pass around to people, and some resources for those others chasing this problem. Back when I was working with Storm, managing the flow of time by passing it as a input and output field through individual components was an absolute necessity for us- first, it was needed just to run the analytics with consistentcy, and second I just thought we'd be mad to build a system that could only run at real time speed with real-time data, so making time a first class piece of data flowing through was an obvious move that fixed both. Spark DStreams on the other hand have a much more discrete sense of time (see what I did there?). I feel like there's pretty good coverage of the straight- forward realtime use, but to really be interesting deployable, getting a better understanding for running DStream in non-realtime fashion (after the fact replay at faster than realtime), understanding what DStream wants and needs and some writeup of gotchas is really integral to making DStreams compatible tackling the lambda architecture broadly well. 2c. Thanks a dozen for the way more nuanced super interesting question Tobias! Just writing in to say that getting even where Tobias is- dstream processing bulk HDFS data- is something I don't feel is super well socialized yet, fingers crossed that base gets built up a little more. -rektide On Thu, Jan 08, 2015 at 01:53:21PM +0900, Tobias Pfeiffer wrote: Hi, I have a setup where data from an external stream is piped into Kafka and also written to HDFS periodically for long-term storage. Now I am trying to build an application that will first process the HDFS files and then switch to Kafka, continuing with the first item that was not yet in HDFS. (The items have an increasing timestamp that I can use to find the first item not yet processed.) I am wondering what an elegant method to provide a unified view of the data would be. Currently, I am using two StreamingContexts one after another: - start one StreamingContext A and process all data found in HDFS (updating the largest seen timestamp in an accumulator), stopping when there was an RDD with 0 items in it, - stop that StreamingContext A, - start a new StreamingContext B and process the Kafka stream (filtering out all items with a timestamp smaller than the value in the accumulator), - stop when the user requests it. This *works* as it is now, but I was planning to add sliding windows (based on item counts or the timestamps in the data), which will get unmanageably complicated when I have a window spanning data in both HDFS and Kafka, I guess. Therefore I would like to have a single DStream that is fed with first HDFS and then Kafka data. Does anyone have a suggestion on how to realize that (with as few copying of private[spark] classes as possible)? I guess one issue is that the Kafka processing requires a receiver and therefore needs to be treated quite a bit differently than HDFS? Thanks Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark supports task level resource management?
Hi, Thanks for the information. One more thing I want to clarify, when does Mesos or Yarn allocate and release the resource? Aka, what is the resource life time? For example, in the stand-along mode, the resource is allocated when the application is launched, resource released when the application finishes. Then, it looks like, in the Mesos fine-grain mode, the resource is allocated when the task is about to run; and released when the task finishes. How about Mesos coarse-grain mode and Yarn mode? Is the resource managed on the Job level? Aka, the resource life time equals the job life time? Or on the stage level? One more question for the Mesos fine-grain mode. How is the overhead of resource allocation and release? In MapReduce, a noticeable time is spend on waiting the resource allocation. What is Mesos fine-grain mode? On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote: Hi Xuelin, I can only speak about Mesos mode. There are two modes of management in Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode. In fine grain mode, each spark task launches one or more spark executors that only live through the life time of the task. So it's comparable to what you spoke about. In coarse grain mode it's going to support dynamic allocation of executors but that's being at a higher level than tasks. As for resource management recommendation, I think it's important to see what other applications you want to be running besides Spark in the same cluster and also your use cases, to see what resource management fits your need. Tim On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
Re: Can spark supports task level resource management?
In coarse grain mode, the spark executors are launched and kept running while the scheduler is running. So if you have a spark shell launched and remained open, the executors are running and won't finish until the shell is exited. In fine grain mode, the overhead time mostly comes from downloading the spark tar (if it's not already deployed in the slaves) and launching the spark executor. I suggest you try it out and look at the latency to see if it fits your use case or not. Tim On Wed, Jan 7, 2015 at 11:19 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Thanks for the information. One more thing I want to clarify, when does Mesos or Yarn allocate and release the resource? Aka, what is the resource life time? For example, in the stand-along mode, the resource is allocated when the application is launched, resource released when the application finishes. Then, it looks like, in the Mesos fine-grain mode, the resource is allocated when the task is about to run; and released when the task finishes. How about Mesos coarse-grain mode and Yarn mode? Is the resource managed on the Job level? Aka, the resource life time equals the job life time? Or on the stage level? One more question for the Mesos fine-grain mode. How is the overhead of resource allocation and release? In MapReduce, a noticeable time is spend on waiting the resource allocation. What is Mesos fine-grain mode? On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote: Hi Xuelin, I can only speak about Mesos mode. There are two modes of management in Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode. In fine grain mode, each spark task launches one or more spark executors that only live through the life time of the task. So it's comparable to what you spoke about. In coarse grain mode it's going to support dynamic allocation of executors but that's being at a higher level than tasks. As for resource management recommendation, I think it's important to see what other applications you want to be running besides Spark in the same cluster and also your use cases, to see what resource management fits your need. Tim On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
[Spare Core] function SparkContext.cancelJobGroup(groupId) doesn't work
Hi all: In my application, I start SparkContext sc and execute some task on sc. (Each task is a thread, which execute some transform and action on RDDs) For each task, I use sc.setJobGroup(JOB_GROUPID, JOB_DESCRIPTION) to set jobGroup for each task. But when I call sc.cancelJobGroup(JOB_GROUPID) to try to cancel the task, it doesn't work. I want to know it's my usage mistake, or there exist some bugs? Thanks
RE: Spark with Hive cluster dependencies
You can follow the below the link also. It works on stand alone spark cluster. https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started thanks Somnath From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Thursday, January 08, 2015 2:21 AM To: jamborta Cc: user Subject: Re: Spark with Hive cluster dependencies Have you looked at Spark SQLhttp://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables? It supports HiveQL, can read from the hive metastore, and does not require hadoop. On Wed, Jan 7, 2015 at 8:27 AM, jamborta jambo...@gmail.commailto:jambo...@gmail.com wrote: Hi all, We have been building a system where we heavily reply on hive queries executed through spark to load and manipulate data, running on CDH and yarn. I have been trying to explore lighter setups where we would not have to maintain a hadoop cluster, just run the system on spark only. Is it possible to run spark standalone, and setup hive alongside, without the hadoop cluster? if not, any suggestion how we can replicate the convenience of hive tables (and hive sql) without hive? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS***
Re: MatrixFactorizationModel serialization
I save and reload model like this: val bestModel = ALS.train(training, rank, numIter, lambda) bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures) val bestModel = obj.asInstanceOf[MatrixFactorizationModel] bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures) But, there has same exception: Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NullPointerException at com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138) at com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala) ... 5 more Have fixed this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create DStream consisting of HDFS and (then) Kafka data
On Thu, Jan 08, 2015 at 02:33:30PM +0900, Tobias Pfeiffer wrote: Hi, On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote: dstream processing bulk HDFS data- is something I don't feel is super well socialized yet, fingers crossed that base gets built up a little more. Just out of interest (and hoping not to hijack my own thread), why are you not doing plain RDD processing when you are only processing HDFS data? What's the advantage of doing DStream? Thanks Tobias Like you- in the old Storm use case, we were doing a lot of windowing functions, c. We want a consistent discretization process for all our intake data, whether it's realtime or not, and we want to use the same discretized stream tech, whether we're discretizing here now or whether it's historical data. Only then is Lambda-beast anywhere near slain. To the single-system. o7 -rektide - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark supports task level resource management?
Hi Xuelin, Spark 1.2 includes a dynamic allocation feature that allows Spark on YARN to modulate its YARN resource consumption as the demands of the application grow and shrink. This is somewhat coarser than what you call task-level resource management. Elasticity comes through allocating and releasing executors, not through requesting resources from YARN for individual tasks. It would be good to add finer-grained task-level elasticity as well, but this will rely on some YARN work (YARN-1197) for changing the resource allocation of a running container. Mesos has a fine-grained mode similar to what you're wondering about. It's documented here: https://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes. -Sandy On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
Re: Can spark supports task level resource management?
Got it, thanks. On Thu, Jan 8, 2015 at 3:30 PM, Tim Chen t...@mesosphere.io wrote: In coarse grain mode, the spark executors are launched and kept running while the scheduler is running. So if you have a spark shell launched and remained open, the executors are running and won't finish until the shell is exited. In fine grain mode, the overhead time mostly comes from downloading the spark tar (if it's not already deployed in the slaves) and launching the spark executor. I suggest you try it out and look at the latency to see if it fits your use case or not. Tim On Wed, Jan 7, 2015 at 11:19 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Thanks for the information. One more thing I want to clarify, when does Mesos or Yarn allocate and release the resource? Aka, what is the resource life time? For example, in the stand-along mode, the resource is allocated when the application is launched, resource released when the application finishes. Then, it looks like, in the Mesos fine-grain mode, the resource is allocated when the task is about to run; and released when the task finishes. How about Mesos coarse-grain mode and Yarn mode? Is the resource managed on the Job level? Aka, the resource life time equals the job life time? Or on the stage level? One more question for the Mesos fine-grain mode. How is the overhead of resource allocation and release? In MapReduce, a noticeable time is spend on waiting the resource allocation. What is Mesos fine-grain mode? On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote: Hi Xuelin, I can only speak about Mesos mode. There are two modes of management in Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode. In fine grain mode, each spark task launches one or more spark executors that only live through the life time of the task. So it's comparable to what you spoke about. In coarse grain mode it's going to support dynamic allocation of executors but that's being at a higher level than tasks. As for resource management recommendation, I think it's important to see what other applications you want to be running besides Spark in the same cluster and also your use cases, to see what resource management fits your need. Tim On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, Currently, we are building up a middle scale spark cluster (100 nodes) in our company. One thing bothering us is, the how spark manages the resource (CPU, memory). I know there are 3 resource management modes: stand-along, Mesos, Yarn In the stand along mode, the cluster master simply allocates the resource when the application is launched. In this mode, suppose an engineer launches a spark-shell, claiming 100 CPU cores and 100G memory, but doing nothing. But the cluster master simply allocates the resource to this app even if the spark-shell does nothing. This is definitely not what we want. What we want is, the resource is allocated when the actual task is about to run. For example, in the map stage, the app may need 100 cores because the RDD has 100 partitions, while in the reduce stage, only 20 cores is needed because the RDD is shuffled into 20 partitions. I'm not very clear about the granularity of the spark resource management. In the stand-along mode, the resource is allocated when the app is launched. What about Mesos and Yarn? Can they support task level resource management? And, what is the recommended mode for resource management? (Mesos? Yarn?) Thanks
[MLlib] Scoring GBTs with a variable number of trees
Hi All, I wonder if anyone has any experience with building Gradient Boosted Tree models in MLlib, and can help me out. I'm trying to create a plot of the test error rate of a Gradient Boosted Tree model as a function of number of trees, to determine the optimal number of trees in the model. Does spark calculate (and store!) the error rate on each iteration of model building? Can I get at those values somehow? Alternatively, having constructed a model, is it possible to score with only a fixed number of trees? e.g. I built a model with 1000 trees, but I only want to score the data with the first 100 trees. I could calculate the needed quantities by hand if I could do that in some way. The optimal number of trees in a GBM is typically determined by calculating the mean standard error on each iteration when building the model. The final model is then considered optimal when the MSE is minimum. i.e. in a plot of MSE vs Number of trees, the error rate will decrease (as the model improves), hit a minimum (the optimal point), and then increase (as the model starts to overfit the data). cheers chris Christopher Thom QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8222 3577 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system.
spark 1.1 got error when working with cdh5.3.0 standalone mode
Hi, I installed the cdh5.3.0 core+Hbase in a new ec2 cluster. Then I manually installed spark1.1 in it. but when I started the slaves, I got an error as follows, ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 Error: Could not find or load main class s.rolling.maxRetainedFiles=3 The spark was compiled against hadoop2.5 + hbase 0.98.6 as in cdh5.3.0. Is the error because of some mysterious conflict somewhere? Or I should use the spark in cdh5.3.0 for safe? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-got-error-when-working-with-cdh5-3-0-standalone-mode-tp21022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1 got error when working with cdh5.3.0 standalone mode
This could be cause by many things including wrong configuration. Hard to tell with just the info you provided. Is there any reason why you want to use your own Spark instead of the one shipped with CDH? CDH 5.3 has Spark 1.2, so unless you really need to run Spark 1.1, you should be better off with the CDH version. On Wed, Jan 7, 2015 at 4:45 PM, freedafeng freedaf...@yahoo.com wrote: Hi, I installed the cdh5.3.0 core+Hbase in a new ec2 cluster. Then I manually installed spark1.1 in it. but when I started the slaves, I got an error as follows, ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 Error: Could not find or load main class s.rolling.maxRetainedFiles=3 The spark was compiled against hadoop2.5 + hbase 0.98.6 as in cdh5.3.0. Is the error because of some mysterious conflict somewhere? Or I should use the spark in cdh5.3.0 for safe? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-got-error-when-working-with-cdh5-3-0-standalone-mode-tp21022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What does (### skipped) mean in the Spark UI?
Sorry- replace ### with an actual number. What does a skipped stage mean? I'm running a series of jobs and it seems like after a certain point, the number of skipped stages is larger than the number of actual completed stages. On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like the number of skipped stages couldn't be formatted. Cheers On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Spark with Cassandra - Shuffle opening to many files
Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur
Re: Spark with Hive cluster dependencies
Have you looked at Spark SQL http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables? It supports HiveQL, can read from the hive metastore, and does not require hadoop. On Wed, Jan 7, 2015 at 8:27 AM, jamborta jambo...@gmail.com wrote: Hi all, We have been building a system where we heavily reply on hive queries executed through spark to load and manipulate data, running on CDH and yarn. I have been trying to explore lighter setups where we would not have to maintain a hadoop cluster, just run the system on spark only. Is it possible to run spark standalone, and setup hive alongside, without the hadoop cluster? if not, any suggestion how we can replicate the convenience of hive tables (and hive sql) without hive? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What does (### skipped) mean in the Spark UI?
+Josh, who added the Job UI page. I've seen this as well and was a bit confused about what it meant. Josh, is there a specific scenario that creates these skipped stages in the Job UI ? Thanks Shivaram On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote: Sorry- replace ### with an actual number. What does a skipped stage mean? I'm running a series of jobs and it seems like after a certain point, the number of skipped stages is larger than the number of actual completed stages. On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like the number of skipped stages couldn't be formatted. Cheers On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Re: Elastic allocation(spark.dynamicAllocation.enabled) results in task never being executed.
Did you end up getting it working? By the way this might be a nicer view of the docs: https://github.com/apache/spark/blob/60e2d9e2902b132b14191c9791c71e8f0d42ce9d/docs/job-scheduling.md We will update the latest Spark docs to include this shortly. -Andrew 2015-01-04 4:44 GMT-08:00 Tsuyoshi Ozawa oz...@apache.org: Please check the document added by Andrew. I could run tasks with Spark 1.2.0. * https://github.com/apache/spark/pull/3731/files#diff-c3cbe4cabe90562520f22d2306aa9116R86 * https://github.com/apache/spark/pull/3757/files#diff-c3cbe4cabe90562520f22d2306aa9116R101 Thanks, - Tsuyoshi On Sun, Jan 4, 2015 at 11:54 AM, firemonk9 dhiraj.peech...@gmail.com wrote: I am running into similar problem. Have you found any resolution to this issue ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Elastic-allocation-spark-dynamicAllocation-enabled-results-in-task-never-being-executed-tp18969p20957.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What does (### skipped) mean in the Spark UI?
We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Re: What does (### skipped) mean in the Spark UI?
Looks like the number of skipped stages couldn't be formatted. Cheers On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Re: KafkaUtils not consuming all the data from all partitions
I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue. Below is my code to create kafka streams, along with the configs used. MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* MapString, Integer topicMap = new HashMap(); for (String topic: topics) { topicMap.put(topic, numStreams); } ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, Could you add the code where you create the Kafka consumer? -kr, Gerard. On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote: Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? -- FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, Mukesh Jha me.mukesh@gmail.com -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: KafkaUtils not consuming all the data from all partitions
- You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support 10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have 2 cores, that would explain why this works with 20 partitions or less. - If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more simply, val kafkaStreams = (1 to numPartitions).map { _ = KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - 1), StorageLevel.MEMORY_ONLY_SER) - You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ? — FG On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha me.mukesh@gmail.com wrote: I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue. Below is my code to create kafka streams, along with the configs used. MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* MapString, Integer topicMap = new HashMap(); for (String topic: topics) { topicMap.put(topic, numStreams); } ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, Could you add the code where you create the Kafka consumer? -kr, Gerard. On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote: Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? -- FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, Mukesh Jha me.mukesh@gmail.com -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like you are inadvertently including Spark code compiled for Hadoop 1 when you run your app. The general idea is to use the cluster's copy at runtime. Those with more pyspark experience might be able to give more useful directions about how to fix that. On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote: this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx, Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks, Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Saving partial (top 10) DStream windows to hdfs
Hi, I applied it as fallows: eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10)) //val topCounts = sortedCounts.transform(rdd = ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) topCounts.print() It gives the output with 10 extra values. I think it works on partition of each rdd rather than just rdd. I also tried the commented code. It gives correct result but in the start it gives serialisation error ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Output for code in red: The values in green looks extra to me. 0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5 and so on. Regards,Laeeq On Tuesday, January 6, 2015 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try something like: val top10 = your_stream.mapPartitions(rdd = rdd.take(10)) ThanksBest Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) I can print top 10 as above in red. I have also tried sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} but I get the following error. 15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Regards,Laeeq
Re: Spark SQL: The cached columnar table is not columnar?
The cache command caches the entire table, with each column stored in its own byte buffer. When querying the data, only the columns that you are asking for are scanned in memory. I'm not sure what mechanism spark is using to report the amount of data read. If you want to read only the data that you are looking for off of the disk, I'd suggest looking at parquet. On Wed, Jan 7, 2015 at 1:37 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, Curious and curious. I'm puzzled by the Spark SQL cached table. Theoretically, the cached table should be columnar table, and only scan the column that included in my SQL. However, in my test, I always see the whole table is scanned even though I only select one column in my SQL. Here is my code: *val sqlContext = new org.apache.spark.sql.SQLContext(sc)* *import sqlContext._* *sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable)* *sqlContext.cacheTable(adTable) //The table has 10 columns* *//First run, cache the table into memory* *sqlContext.sql(select * from adTable).collect* *//Second run, only one column is used. It should only scan a small fraction of data* *sqlContext.sql(select adId from adTable).collect * *sqlContext.sql(select adId from adTable).collect* *sqlContext.sql(select adId from adTable).collect* What I found is, every time I run the SQL, in WEB UI, it shows the total amount of input data is always the same --- the total amount of the table. Is anything wrong? My expectation is: 1. The cached table is stored as columnar table 2. Since I only need one column in my SQL, the total amount of input data showed in WEB UI should be very small But what I found is totally not the case. Why? Thanks
Re: What does (### skipped) mean in the Spark UI?
Ah I see - So its more like 're-used stages' which is not necessarily a bug in the program or something like that. Thanks for the pointer to the comment Thanks Shivaram On Wed, Jan 7, 2015 at 2:00 PM, Mark Hamstra m...@clearstorydata.com wrote: That's what you want to see. The computation of a stage is skipped if the results for that stage are still available from the evaluation of a prior job run: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L163 On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote: Sorry- replace ### with an actual number. What does a skipped stage mean? I'm running a series of jobs and it seems like after a certain point, the number of skipped stages is larger than the number of actual completed stages. On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like the number of skipped stages couldn't be formatted. Cheers On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Re: Spark with Cassandra - Shuffle opening to many files
General ideas regarding too many open files: Make sure ulimit is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try spark.shuffle.manager = sort On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur
ScalaReflectionException when using saveAsParquetFile in sbt
I'm on Spark 1.2.0, with Scala 1.11.2, and SBT 0.13.7. When running: case class Test(message: String) val sc = new SparkContext(local, shell) val sqlContext = new SQLContext(sc) import sqlContext._ val testing = sc.parallelize(List(Test(this), Test(is), Test(a), Test(test))) testing.saveAsParquetFile(test) I get the following error: scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@64fc9229 of type class java.net.URLClassLoader with classpath [file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/jline/jline/jars/jline-2.12.jar] and parent being xsbt.boot.BootFilteredLoader@1f421ab0 of type class xsbt.boot.BootFilteredLoader with classpath [unknown] and parent being sun.misc.Launcher$AppClassLoader@372f2b32 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/usr/local/Cellar/sbt/0.13.7/libexec/sbt-launch.jar] and parent being sun.misc.Launcher$ExtClassLoader@79bcfbeb of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/dnsns.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/localedata.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunec.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/zipfs.jar,file:/System/Library/Java/Extensions/AppleScriptEngine.jar,file:/System/Library/Java/Extensions/dns_sd.jar,file:/System/Library/Java/Extensions/j3daudio.jar,file:/System/Library/Java/Extensions/j3dcore.jar,file:/System/Library/Java/Extensions/j3dutils.jar,file:/System/Library/Java/Extensions/jai_codec.jar,file:/System/Library/Java/Extensions/jai_core.jar,file:/System/Library/Java/Extensions/libAppleScriptEngine.jnilib,file:/System/Library/Java/Extensions/libJ3D.jnilib,file:/System/Library/Java/Extensions/libJ3DAudio.jnilib,file:/System/Library/Java/Extensions/libJ3DUtils.jnilib,file:/System/Library/Java/Extensions/libmlib_jai.jnilib,file:/System/Library/Java/Extensions/libQTJNative.jnilib,file:/System/Library/Java/Extensions/mlibwrapper_jai.jar,file:/System/Library/Java/Extensions/MRJToolkit.jar,file:/System/Library/Java/Extensions/QTJava.zip,file:/System/Library/Java/Extensions/vecmath.jar,file:/usr/lib/java/libjdns_sd.jnilib] and parent being primordial classloader with boot classpath [/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/JObjC.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:341) at scala.reflect.api.Universe.typeOf(Universe.scala:61) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33) at
Spark Streaming with Listening Server Socket
I'm new to Spark Streaming. From the programming guide I saw there is this JavaStreamingContext.socketTextStream() API that connects to a server and grab the content to process. My requirement is a slightly different: I used to have listening server that receives (not go out to grab) messages from an external source, then process each message. I'm wondering if Spark Streaming can support this kind of socket communication, i.e. start a listening server socket at a port, then process each message it receives. I'm not sure if my understanding of the semantics of the JavaStreamingContext.socketTextStream() API is correct. So I'm not sure if my question above is valid or not. I'll greatly appreciate any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Listening-Server-Socket-tp21021.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark with Cassandra - Shuffle opening to many files
Thank you Cody!! I am going to try with the two settings you have mentioned. We are currently running with Spark standalone cluster manager. Thanks Ankur On Wed, Jan 7, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: General ideas regarding too many open files: Make sure ulimit is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try spark.shuffle.manager = sort On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur
Re: What does (### skipped) mean in the Spark UI?
That's what you want to see. The computation of a stage is skipped if the results for that stage are still available from the evaluation of a prior job run: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L163 On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote: Sorry- replace ### with an actual number. What does a skipped stage mean? I'm running a series of jobs and it seems like after a certain point, the number of skipped stages is larger than the number of actual completed stages. On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like the number of skipped stages couldn't be formatted. Cheers On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: We just upgraded to Spark 1.2.0 and we're seeing this in the UI.
Re: Spark SQL: The cached columnar table is not columnar?
Thanks Michael. 2015-01-08 6:04 GMT+08:00 Michael Armbrust mich...@databricks.com: The cache command caches the entire table, with each column stored in its own byte buffer. When querying the data, only the columns that you are asking for are scanned in memory. I'm not sure what mechanism spark is using to report the amount of data read. If you want to read only the data that you are looking for off of the disk, I'd suggest looking at parquet. On Wed, Jan 7, 2015 at 1:37 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, Curious and curious. I'm puzzled by the Spark SQL cached table. Theoretically, the cached table should be columnar table, and only scan the column that included in my SQL. However, in my test, I always see the whole table is scanned even though I only select one column in my SQL. Here is my code: *val sqlContext = new org.apache.spark.sql.SQLContext(sc)* *import sqlContext._* *sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable)* *sqlContext.cacheTable(adTable) //The table has 10 columns* *//First run, cache the table into memory* *sqlContext.sql(select * from adTable).collect* *//Second run, only one column is used. It should only scan a small fraction of data* *sqlContext.sql(select adId from adTable).collect * *sqlContext.sql(select adId from adTable).collect* *sqlContext.sql(select adId from adTable).collect* What I found is, every time I run the SQL, in WEB UI, it shows the total amount of input data is always the same --- the total amount of the table. Is anything wrong? My expectation is: 1. The cached table is stored as columnar table 2. Since I only need one column in my SQL, the total amount of input data showed in WEB UI should be very small But what I found is totally not the case. Why? Thanks
When will spark support push style shuffle?
Hi, I've heard a lot of complain about spark's pull style shuffle. Is there any plan to support push style shuffle in the near future? Currently, the shuffle phase must be completed before the next stage starts. While, it is said, in Impala, the shuffled data is streamed to the next stage handler, which greatly saves time. Will spark support this mechanism one day? Thanks
Spark on teradata?
Hi, I have a stupid question: Is it possible to use spark on Teradata data warehouse, please? I read some news on internet which say yes. However, I didn't find any example about this issue Thanks in advance. Cheers Gen
RE: FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success
Thanks Andrew, simple fix ☺. From: Andrew Ash [mailto:and...@andrewash.com] Sent: 07 January 2015 15:26 To: England, Michael (IT/UK) Cc: user Subject: Re: FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success Hi Michael, I think you need to explicitly call sc.stop() on the spark context for it to close down properly (this doesn't happen automatically). See https://issues.apache.org/jira/browse/SPARK-2972 for more details Andrew On Wed, Jan 7, 2015 at 3:38 AM, michael.engl...@nomura.commailto:michael.engl...@nomura.com wrote: Hi, I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I run example Java jobs such as spark-pi, the following files get created: bash-4.1$ tree spark-pi-1420624364958 spark-pi-1420624364958 âââ APPLICATION_COMPLETE âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 3 files However, when I run my pyspark job, no APPLICATION_COMPLETE file gets created. bash-4.1$ tree pyspark-1420628130353 pyspark -1420628130353 âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 2 files If I touch the file into this directory, it just appears as not started in the history server UI. I am submitting jobs using spark-submit for now: bin/spark-submit --master yarn-client --executor-memory 4G --executor-cores 12 --num-executors 10 –queue highpriority path to python file Is there a setting I am missing for this APPLICATION_COMPLETE file to be created when a pyspark job completes? Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: Strange DAG scheduling behavior on currently dependent RDDs
I asked this question too soon. I am caching off a bunch of RDDs in a TrieMap so that our framework can wire them together and the locking was not completely correct- therefore it was creating multiple new RDDs at times instead of using cached versions- which were creating completely separate lineages. What's strange is that this bug only surfaced when I updated Spark. On Wed, Jan 7, 2015 at 9:12 AM, Corey Nolet cjno...@gmail.com wrote: We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework that we've been developing that connects various different RDDs together based on some predefined business cases. After updating to 1.2.0, some of the concurrency expectations about how the stages within jobs are executed have changed quite significantly. Given 3 RDDs: RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache() RDD2 = RDD1.outputToFile RDD3 = RDD1.groupBy().outputToFile In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and RDD3 to both block waiting for RDD1 to complete and cache- at which point RDD2 and RDD3 both use the cached version to complete their work. Spark 1.2.0 seems to schedule two (be it concurrently running) stages for each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each get run twice). It does not look like there is any sharing of the results between these jobs. Are we doing something wrong? Is there a setting that I'm not understanding somewhere?
Spark History Server can't read event logs
Hi, When I run jobs and save the event logs, they are saved with the permissions of the unix user and group that ran the spark job. The history server is run as a service account and therefore can’t read the files: Extract from the History server logs: 2015-01-07 15:37:24,3021 ERROR Client fs/client/fileclient/cc/client.cc:1009 Thread: 1183 User does not have access to open file /apps/spark/historyserver/logs/spark-1420644521194 15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing Spark event log /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1 org.apache.hadoop.security.AccessControlException: Open failed for file: /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error: Permission denied (13) Is there a setting which I can change that allows the files to be world readable or at least by the account running the history server? Currently, the job appears in the History Sever UI but only states ‘Not Started’. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: [MLLib] storageLevel in ALS
I guess I can but it would be nicer if that is made a configuration, I can create the issue, test and PR if you guys think its appropiate On Wed, Jan 7, 2015 at 1:41 PM, Sean Owen so...@cloudera.com wrote: Ah, Fernando means the usersOut / productsOut RDDs, not the intermediate links RDDs. Can you unpersist() them, and persist() again at the desired level? the downside is that this might mean recomputing and repersisting the RDDs. On Wed, Jan 7, 2015 at 5:11 AM, Xiangrui Meng men...@gmail.com wrote: Which Spark version are you using? We made this configurable in 1.1: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202 -Xiangrui On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote: Hi, I was doing a tests with ALS and I noticed that if I persist the inner RDDs from a MatrixFactorizationModel the RDD is not replicated, it seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it makes sense to make that configurable? [image: Inline image 1]
streaming application throws IOException due to Log directory already exists during checkpoint recovery
Hi All, I run a Spark streaming application (Spark 1.2.0) on YARN (Hadoop 2.5.2) with Spark event log enabled. I set the checkpoint dir in the streaming context and run the app. It started in YARN with application id 'app_id_1' and created the Spark event log dir /spark/applicationHistory/app_id_1. I killed the app and rerun it with the same checkpoint dir, this time it had a different YARN application id 'app_id_2'. However, rerun failed due to Log directory already exists: Exception in thread Driver java.io.IOException: Log directory hdfs://xxx:8020/spark/applicationHistory/app_id_1 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.xxx.spark.streaming.JavaKafkaSparkHbase.main(JavaKafkaSparkHbase.java:121) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) Is this an expected behavior? When recoverying from the checkpoint, shouldn't an event log dir with the name of a new application id created (in the above example, rerun should create /spark/applicationHistory/app_id_2)? Thanks, Max
Re: Saving partial (top 10) DStream windows to hdfs
Hi, It worked out as this. val topCounts = sortedCounts.transform(rdd = rdd.zipWithIndex().filter(x=x._2 =10)) Regards,Laeeq On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed laeeqsp...@yahoo.com.INVALID wrote: Hi Yana, I also think thatval top10 = your_stream.mapPartitions(rdd = rdd.take(10)) will give top 10 from each partition. I will try your code. Regards,Laeeq On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: My understanding is that val top10 = your_stream.mapPartitions(rdd = rdd.take(10)) would result in an RDD containing the top 10 entries per partition -- am I wrong? I am not sure if there is a more efficient way but I think this would work: sortedCounts.zipWithIndex().filter(x=x._2 =10).saveAsText On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I applied it as fallows: eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10)) //val topCounts = sortedCounts.transform(rdd = ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) topCounts.print() It gives the output with 10 extra values. I think it works on partition of each rdd rather than just rdd. I also tried the commented code. It gives correct result but in the start it gives serialisation error ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Output for code in red: The values in green looks extra to me. 0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5 and so on. Regards,Laeeq On Tuesday, January 6, 2015 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try something like: val top10 = your_stream.mapPartitions(rdd = rdd.take(10)) ThanksBest Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) I can print top 10 as above in red. I have also tried sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} but I get the following error. 15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Regards,Laeeq
Re: Saving partial (top 10) DStream windows to hdfs
Oh yeah. In that case you can simply repartition it into 1 and do mapPartition. val top10 = mysream.repartition(1).mapPartitions(rdd = rdd.take(10)) On 7 Jan 2015 21:08, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I applied it as fallows: eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) val counts = eegStreams(a).map(x = math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap) val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10)) *//val topCounts = sortedCounts.transform(rdd = ssc.sparkContext.makeRDD(rdd.take(10)))* topCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs:// ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) topCounts.print() It gives the output with 10 extra values. I think it works on partition of each rdd rather than just rdd. I also tried the commented code. It gives correct result but in the start it gives serialisation error ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Output for code in red: The values in green looks extra to me. 0,578 -3,576 4,559 -1,556 3,553 -6,540 6,538 -4,535 1,526 10,483 *94,8* *-113,8* *-137,8* *-85,8* *-91,8* *-121,8* *114,8* *108,8* *93,8* *101,8* 1,128 -8,118 3,112 -4,110 -13,108 4,108 -3,107 -10,107 -6,106 8,105 *76,6* *74,6* *60,6* *52,6* *70,6* *71,6* *-60,6* *55,6* *78,5* *64,5* and so on. Regards, Laeeq On Tuesday, January 6, 2015 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try something like: *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))* Thanks Best Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* *val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* *val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)* *//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))* *sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))* I can print top 10 as above in red. I have also tried *sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} * but I get the following error. *15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext* *java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext* Regards, Laeeq
Re: Saving partial (top 10) DStream windows to hdfs
Hi Yana, I also think thatval top10 = your_stream.mapPartitions(rdd = rdd.take(10)) will give top 10 from each partition. I will try your code. Regards,Laeeq On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: My understanding is that val top10 = your_stream.mapPartitions(rdd = rdd.take(10)) would result in an RDD containing the top 10 entries per partition -- am I wrong? I am not sure if there is a more efficient way but I think this would work: sortedCounts.zipWithIndex().filter(x=x._2 =10).saveAsText On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I applied it as fallows: eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10)) //val topCounts = sortedCounts.transform(rdd = ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) topCounts.print() It gives the output with 10 extra values. I think it works on partition of each rdd rather than just rdd. I also tried the commented code. It gives correct result but in the start it gives serialisation error ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Output for code in red: The values in green looks extra to me. 0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5 and so on. Regards,Laeeq On Tuesday, January 6, 2015 9:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can try something like: val top10 = your_stream.mapPartitions(rdd = rdd.take(10)) ThanksBest Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1)) I can print top 10 as above in red. I have also tried sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} but I get the following error. 15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Regards,Laeeq
Re: [MLLib] storageLevel in ALS
Ah, Fernando means the usersOut / productsOut RDDs, not the intermediate links RDDs. Can you unpersist() them, and persist() again at the desired level? the downside is that this might mean recomputing and repersisting the RDDs. On Wed, Jan 7, 2015 at 5:11 AM, Xiangrui Meng men...@gmail.com wrote: Which Spark version are you using? We made this configurable in 1.1: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202 -Xiangrui On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote: Hi, I was doing a tests with ALS and I noticed that if I persist the inner RDDs from a MatrixFactorizationModel the RDD is not replicated, it seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it makes sense to make that configurable? [image: Inline image 1]
Re: KafkaUtils not consuming all the data from all partitions
AFAIK, there're two levels of parallelism related to the Spark Kafka consumer: At JVM level: For each receiver, one can specify the number of threads for a given topic, provided as a map [topic - nthreads]. This will effectively start n JVM threads consuming partitions of that kafka topic. At Cluster level: One can create several DStreams, and each will have one receiver and use 1 executor core in Spark each DStream will have its receiver as defined in the previous line. What you need to ensure is that there's a consumer attached to each partition of your kafka topic. That is, nthreads * nReceivers = #kafka_partitions(topic) e.g: Given nPartitions = #partitions of your topic nThreads = #of threads per receiver val kafkaStreams = (1 to nPartitions/nThreads).map{ i = KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - nThreads), StorageLevel.MEMORY_ONLY_SER) For this to work, you need at least (nPartitions/nThreads +1) cores in your Spark cluster, although I would recommend to have 2-3x (nPartitions/nThreads). (and don't forget to union the streams after creation) -kr, Gerard. On Wed, Jan 7, 2015 at 4:43 PM, francois.garil...@typesafe.com wrote: - You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support 10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have 2 cores, that would explain why this works with 20 partitions or less. - If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more simply, val kafkaStreams = (1 to numPartitions).map { _ = KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - 1), StorageLevel.MEMORY_ONLY_SER) - You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ? — FG On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha me.mukesh@gmail.com wrote: I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue. Below is my code to create kafka streams, along with the configs used. MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* MapString, Integer topicMap = new HashMap(); for (String topic: topics) { topicMap.put(topic, numStreams); } ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, Could you add the code where you create the Kafka consumer? -kr, Gerard. On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote: Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? — FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
Spark with Hive cluster dependencies
Hi all, We have been building a system where we heavily reply on hive queries executed through spark to load and manipulate data, running on CDH and yarn. I have been trying to explore lighter setups where we would not have to maintain a hadoop cluster, just run the system on spark only. Is it possible to run spark standalone, and setup hive alongside, without the hadoop cluster? if not, any suggestion how we can replicate the convenience of hive tables (and hive sql) without hive? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
thanks, I found the issue, I was including /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into the classpath - this was breaking it. now using custom jar with just the python convertors and all works as a charm.thanks,antony. On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com wrote: Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like you are inadvertently including Spark code compiled for Hadoop 1 when you run your app. The general idea is to use the cluster's copy at runtime. Those with more pyspark experience might be able to give more useful directions about how to fix that. On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote: this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx,Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Spark Trainings/ Professional certifications
O'Reilly + Databricks does certification: http://www.oreilly.com/data/sparkcert Databricks does training: http://databricks.com/spark-training Cloudera does too: http://www.cloudera.com/content/cloudera/en/training/courses/spark-training.html That said, I am not sure you need a certificate to solve your problem. If you can summarize briefly what you're modeling and what the results are like, maybe someone can suggest the problem. On Wed, Jan 7, 2015 at 2:38 PM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Re: spark-network-yarn 2.11 depends on spark-network-shuffle 2.10
This particular case shouldn't cause problems since both of those libraries are java-only (the scala version appended there is just for helping the build scripts). But it does look weird, so it would be nice to fix it. On Wed, Jan 7, 2015 at 12:25 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: It seems that spark-network-yarn compiled for scala 2.11 depends on spark-network-shuffle compiled for scala 2.10. This causes cross version dependencies conflicts in sbt. Seems like a publishing error? http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: calculating the mean of SparseVector RDD
There is some serialization overhead. You can try https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107 . -Xiangrui On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote: I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Trainings/ Professional certifications
Hi, I am sorry to bother you, but I couldn't find any information about online test of spark certification managed through Kryterion. Could you please give me the link about it? Thanks a lot in advance. Cheers Gen On Wed, Jan 7, 2015 at 6:18 PM, Paco Nathan cet...@gmail.com wrote: Hi Saurabh, In your area, Big Data Partnership provides Spark training: http://www.bigdatapartnership.com/ As Sean mentioned, there is a certification program via a partnership between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert That is offered in two ways, in-person at events such as Strata + Hadoop World http://strataconf.com/ and also an online test managed through Kryterion. Sign up on the O'Reilly page. There are also two MOOCs starting soon on edX through University of California: Intro to Big Data with Apache Spark by Prof. Anthony Joseph, UC Berkeley begins Feb 23 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk Scalable Machine Learning Prof. Ameet Talwalkar, UCLA begins Apr 14 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk For coaching, arguably you might be best to talk with consultants especially for near-term needs. Contact me off-list and I can help provide intros in your area. Thanks, Paco On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Re: Shuffle Problems in 1.2.0
Could you try it on AWS using EMR? That'd give you an exact replica of the environment that causes the error. Sent from my iPhone On Jan 7, 2015, at 10:53 AM, Davies Liu dav...@databricks.com wrote: Hey Sven, I tried with all of your configurations, 2 node with 2 executors each, but in standalone mode, it worked fine. Could you try to narrow down the possible change of configurations? Davies On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser kras...@gmail.com wrote: Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS Spark EMR cluster as follows: aws emr create-cluster --region us-west-1 --no-auto-terminate \ --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args='[-g]' \ --ami-version 3.3 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name Spark Issue Repro \ --visible-to-all-users --applications Name=Ganglia This is a 10 node cluster (not sure if this makes a difference outside of HDFS block locality). Then use this Gist here as your spark-defaults file (it'll configure 2 executors per job as well): https://gist.github.com/skrasser/9b978d3d572735298d16 With that, I am seeing this again: 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in stage 0.0 (TID 27) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@4cfae71c) Thanks for the performance pointers -- the repro script is fairly unpolished (just enough to cause the aforementioned exception). Hope this sheds some light on the error. From what I can tell so far, something in the spark-defaults file triggers it (with other settings it completes just fine). Thanks for your help! -Sven On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu dav...@databricks.com wrote: I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com wrote: I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14]
Re: Spark Trainings/ Professional certifications
For online, use the http://www.oreilly.com/go/sparkcert link to sign up via O'Reilly. They will send details -- the announcement is being prepared. On Wed, Jan 7, 2015 at 10:56 AM, gen tang gen.tan...@gmail.com wrote: Hi, I am sorry to bother you, but I couldn't find any information about online test of spark certification managed through Kryterion. Could you please give me the link about it? Thanks a lot in advance. Cheers Gen On Wed, Jan 7, 2015 at 6:18 PM, Paco Nathan cet...@gmail.com wrote: Hi Saurabh, In your area, Big Data Partnership provides Spark training: http://www.bigdatapartnership.com/ As Sean mentioned, there is a certification program via a partnership between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert That is offered in two ways, in-person at events such as Strata + Hadoop World http://strataconf.com/ and also an online test managed through Kryterion. Sign up on the O'Reilly page. There are also two MOOCs starting soon on edX through University of California: Intro to Big Data with Apache Spark by Prof. Anthony Joseph, UC Berkeley begins Feb 23 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk Scalable Machine Learning Prof. Ameet Talwalkar, UCLA begins Apr 14 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk For coaching, arguably you might be best to talk with consultants especially for near-term needs. Contact me off-list and I can help provide intros in your area. Thanks, Paco On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Re: Spark History Server can't read event logs
The Spark code generates the log directory with 770 permissions. On top of that you need to make sure of two things: - all directories up to /apps/spark/historyserver/logs/ are readable by the user running the history server - the user running the history server belongs to the group that owns /apps/spark/historyserver/logs/ I think the code could be more explicitly about setting the group of the generated log directories and files, but if you follow the two rules above things should work. Also, I recommend setting /apps/spark/historyserver/logs/ itself to 1777 so that any user can generate logs, but only the owner (or a superuser) can delete them. On Wed, Jan 7, 2015 at 7:45 AM, michael.engl...@nomura.com wrote: Hi, When I run jobs and save the event logs, they are saved with the permissions of the unix user and group that ran the spark job. The history server is run as a service account and therefore can’t read the files: Extract from the History server logs: 2015-01-07 15:37:24,3021 ERROR Client fs/client/fileclient/cc/client.cc:1009 Thread: 1183 User does not have access to open file /apps/spark/historyserver/logs/spark-1420644521194 15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing Spark event log /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1 org.apache.hadoop.security.AccessControlException: Open failed for file: /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error: Permission denied (13) Is there a setting which I can change that allows the files to be world readable or at least by the account running the history server? Currently, the job appears in the History Sever UI but only states ‘Not Started’. Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Problems in 1.2.0
Hey Sven, I tried with all of your configurations, 2 node with 2 executors each, but in standalone mode, it worked fine. Could you try to narrow down the possible change of configurations? Davies On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser kras...@gmail.com wrote: Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS Spark EMR cluster as follows: aws emr create-cluster --region us-west-1 --no-auto-terminate \ --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args='[-g]' \ --ami-version 3.3 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name Spark Issue Repro \ --visible-to-all-users --applications Name=Ganglia This is a 10 node cluster (not sure if this makes a difference outside of HDFS block locality). Then use this Gist here as your spark-defaults file (it'll configure 2 executors per job as well): https://gist.github.com/skrasser/9b978d3d572735298d16 With that, I am seeing this again: 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in stage 0.0 (TID 27) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@4cfae71c) Thanks for the performance pointers -- the repro script is fairly unpolished (just enough to cause the aforementioned exception). Hope this sheds some light on the error. From what I can tell so far, something in the spark-defaults file triggers it (with other settings it completes just fine). Thanks for your help! -Sven On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu dav...@databricks.com wrote: I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com wrote: I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx,Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success
Hi Michael, I think you need to explicitly call sc.stop() on the spark context for it to close down properly (this doesn't happen automatically). See https://issues.apache.org/jira/browse/SPARK-2972 for more details Andrew On Wed, Jan 7, 2015 at 3:38 AM, michael.engl...@nomura.com wrote: Hi, I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I run example Java jobs such as spark-pi, the following files get created: bash-4.1$ tree spark-pi-1420624364958 spark-pi-1420624364958 âââ APPLICATION_COMPLETE âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 3 files However, when I run my pyspark job, no APPLICATION_COMPLETE file gets created. bash-4.1$ tree pyspark-1420628130353 pyspark -1420628130353 âââ EVENT_LOG_1 âââ SPARK_VERSION_1.1.0 0 directories, 2 files If I touch the file into this directory, it just appears as not started in the history server UI. I am submitting jobs using spark-submit for now: *bin/spark-submit --master yarn-client --executor-memory 4G --executor-cores 12 --num-executors 10 –queue highpriority path to python file* Is there a setting I am missing for this APPLICATION_COMPLETE file to be created when a pyspark job completes? Thanks, Michael This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to Nomura is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: http://www.Nomura.com/email_disclaimer.htm
Re: Spark Trainings/ Professional certifications
Hi Saurabh, In your area, Big Data Partnership provides Spark training: http://www.bigdatapartnership.com/ As Sean mentioned, there is a certification program via a partnership between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert That is offered in two ways, in-person at events such as Strata + Hadoop World http://strataconf.com/ and also an online test managed through Kryterion. Sign up on the O'Reilly page. There are also two MOOCs starting soon on edX through University of California: Intro to Big Data with Apache Spark by Prof. Anthony Joseph, UC Berkeley begins Feb 23 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk Scalable Machine Learning Prof. Ameet Talwalkar, UCLA begins Apr 14 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk For coaching, arguably you might be best to talk with consultants especially for near-term needs. Contact me off-list and I can help provide intros in your area. Thanks, Paco On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, Can you please suggest some of the best available trainings/ coaching and professional certifications in Apache Spark? We are trying to run predictive analysis on our Sales data and come out with recommendations (leads). We have tried to run CF but we end up getting absolutely bogus results!! A training that would leave us hands on to do our job effectively is what we are after. In addition to this, if this training could provide a firm ground for a professional certification, that would be an added advantage. Thanks for your inputs Regards, Saurabh Agrawal -- This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294
Join stucks in the last stage step
Hello, I have problem with join of two tables via Spark - I have tried to do it via Spark SQL and API but no progress so far. I have basicaly two tables ACCONTS - 16 mio records and TRANSACTIONS 2,5 billion records. When I try to join the tables (please see code) the job stucks in the last stage for very long (please see console output). And after eg 2h it writes to the output a weird exception like /org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0/ I have tried several strategies - repartitioning of RDDs, broadcast the smaller one, but result is always same Have sombody idea what happens? Source Code. AccJoin.java http://apache-spark-user-list.1001560.n3.nabble.com/file/n21018/AccJoin.java Console AccJoin_0.html http://apache-spark-user-list.1001560.n3.nabble.com/file/n21018/AccJoin_0.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Join-stucks-in-the-last-stage-step-tp21018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
calculating the mean of SparseVector RDD
I have an RDD of SparseVectors and I'd like to calculate the means returning a dense vector. I've tried doing this with the following (using pyspark, spark v1.2.0): def aggregate_partition_values(vec1, vec2) : vec1[vec2.indices] += vec2.values return vec1 def aggregate_combined_vectors(vec1, vec2) : if all(vec1 == vec2) : # then the vector came from only one partition return vec1 else: return vec1 + vec2 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values, aggregate_combined_vectors) means = means / nvals This turns out to be really slow -- and doesn't seem to depend on how many vectors there are so there seems to be some overhead somewhere that I'm not understanding. Is there a better way of doing this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org