Re: Serialization issue when using HBase with Spark
Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section setup() and cleanup() in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com: In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD vs Broadcast
We are developing Spark framework wherein we are moving historical data into RDD sets. Basically, RDD is immutable, read only dataset on which we do operations. Based on that we have moved historical data into RDD and we do computations like filtering/mapping, etc on such RDDs. Now there is a use case where a subset of the data in the RDD gets updated and we have to recompute the values. So far I have been able to think of below approaches - Approach1 - broadcast the change: 1. I have already filtered the historical RDD on scope 2. Whenever there is an update on the values, I apply a map phase on /RDD at step1/ by doing a lookup on the broadcast, thereby creating a new RDD 3. now I do all the computations again on this new /RDD at step2/ Approach2: 1. Maintain historical data RDDs 2. Maintain /Delta/ RDDs on the historical data. Since initially there are no updates it will be an empty RDD 3. Whenever there is an update on the values, create a new /Delta/ RDD and discard the old value 4. Recompute the values by doing a join between historical RDDs and /Delta/ RDDs Approach 3: I had thought of /Delta/ RDD to be a streaming RDD as well where I keep updating the same RDD and do re-computation. But as far as I understand it can take streams from Flume or Kafka. Whereas in my case the values are generated in the application itself based on user interaction. Hence I cannot see any integration points of streaming RDD in my context. Any suggestion on which approach is better or any other approach suitable for this scenario. TIA! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-vs-Broadcast-tp20682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding a column to a SchemaRDD
Hi Nathan, #1 Spark SQL DSL can satisfy your requirement. You can refer the following code snippet: jdata.select(Star(Node), 'seven.getField(mod), 'eleven.getField(mod)) You need to import org.apache.spark.sql.catalyst.analysis.Star in advance. #2 After you make the transform above, you do not need to make SchemaRDD manually. Because that jdata.select() return a SchemaRDD and you can operate on it directly. For example, the following code snippet will return a new SchemaRDD with longer Row: val t1 = jdata.select(Star(Node), 'seven.getField(mod) + 'eleven.getField(mod) as 'mod_sum) You can use t1.printSchema() to print the schema of this SchemaRDD and check whether it satisfy your requirements. 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: (1) I understand about immutability, that's why I said I wanted a new SchemaRDD. (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and results in a new SchemaRDD with one new function. (3) The DSL stuff is a big clue, but I can't find adequate documentation for it What I'm looking for is something like: import org.apache.spark.sql._ val sqlc = new SQLContext(sc) import sqlc._ val data = sc.parallelize(0 to 99).map(n = ({\seven\: {\mod\: %d, \times\: %d}, + \eleven\: {\mod\: %d, \times\: %d}}).format(n % 7, n * 7, n % 11, n * 11)) val jdata = sqlc.jsonRDD(data) jdata.registerTempTable(jdata) val sqlVersion = sqlc.sql(SELECT *, (seven.mod + eleven.mod) AS modsum FROM jdata) This sqlVersion works fine, but if I try to do the same thing with a programatic function, I'm missing a bunch of pieces: - I assume I'd need to start with something like: jdata.select('*, 'seven.mod, 'eleven.mod) and then get and process the last two elements. The problems are: - I can't select '* - there seems no way to get the complete row - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation seems only one deep. - Assuming I could do that, I don't see a way to make the result into a SchemaRDD. I assume I would have to do something like: 1. take my row and value, and create a new, slightly longer row 2. take my old schema, and create a new schema with one more field at the end, named and typed appropriately 3. combine the two into a SchemaRDD I think I see how to do 3, but 1 and 2 elude me. Is there more complete documentation somewhere for the DSL portion? Anyone have a clue about any of the above? On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang yanboha...@gmail.com wrote: RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com : Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Does filter on an RDD scan every data item ?
Thanks! shall try it out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20683.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Run Spark job on Playframework + Spark Master/Worker in one Mac
Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Why my SQL UDF cannot be registered?
Hi, I tried to create a function that to convert an Unix time stamp to the hour number in a day. It works if the code is like this:sqlContext.registerFunction(toHour, (x:Long)={new java.util.Date(x*1000).getHours}) But, if I do it like this, it doesn't work: def toHour (x:Long) = {new java.util.Date(x*1000).getHours} sqlContext.registerFunction(toHour, toHour) The system reports an error:console:23: error: missing arguments for method toHour;follow this method with `_' if you want to treat it as a partially applied function sqlContext.registerFunction(toHour, toHour) Anyone can help on dealing with this error?
Re: Run Spark job on Playframework + Spark Master/Worker in one Mac
Try the workaround (addClassPathJars(sparkContext, this.getClass.getClassLoader) discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E Thanks, Aniket On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Re: Serialization issue when using HBase with Spark
The reason not using sc.newAPIHadoopRDD is it only support one scan each time. I am not sure is that's true. You can use multiple scans as following: val scanStrings = scans.map(scan = convertScanToString(scan)) conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*) where convertScanToString is implemented as: /** * Serializes a HBase scan into string. * @param scan Scan to serialize. * @return Base64 encoded serialized scan. */ private def convertScanToString(scan: Scan) = { val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } Thanks, Aniket On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote: Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section setup() and cleanup() in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com: In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark with HBase
In case you are still looking for help, there has been multiple discussions in this mailing list that you can try searching for. Or you can simply use https://github.com/unicredit/hbase-rdd :-) Thanks, Aniket On Wed Dec 03 2014 at 16:11:47 Ted Yu yuzhih...@gmail.com wrote: Which hbase release are you running ? If it is 0.98, take a look at: https://issues.apache.org/jira/browse/SPARK-1297 Thanks On Dec 2, 2014, at 10:21 PM, Jai jaidishhari...@gmail.com wrote: I am trying to use Apache Spark with a psuedo distributed Hadoop Hbase Cluster and I am looking for some links regarding the same. Can someone please guide me through the steps to accomplish this. Thanks a lot for Helping -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-HBase-tp20226.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveQL support in Cassandra-Spark connector
Hi, I just wonder if Cassandra-Spark connector supports executing HiveQL on Cassandra tables? best, /Shahab
Re: JSON Input files
Hi Helena and All, I have found one example multi-line json file into an RDD using https://github.com/alexholmes/json-mapreduce;. val data = sc.newAPIHadoopFile( filepath, classOf[MultiLineJsonInputFormat], classOf[LongWritable], classOf[Text], conf ).map(p = (p._1.get, p._2.toString)) data.count It is expecting Conf object. What Conf value I need to specify and how to specify. MultiLineJsonInputFormat class is expecting member value. How to pass member value. Otherwise I'm getting below exception *java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:115) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:103) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)* Please let me know who to resolve this issue Regards, Rajesh On Sun, Dec 14, 2014 at 7:21 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Thank you Yanbo Regards, Rajesh On Sun, Dec 14, 2014 at 3:15 PM, Yanbo yanboha...@gmail.com wrote: Pay attention to your JSON file, try to change it like following. Each record represent as a JSON string. {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar mrajaf...@gmail.com 写道: { Device 1 : {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } Device 2 : {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } }
Re: ...FileNotFoundException: Path is not a file: - error on accessing HDFS with sc.wholeTextFiles
Thanks Akhil, In line with your suggestion I have used the following 2 commands to flatten the directory structure: find . -type f -iname '*' -exec mv '{}' . \; find . -type d -exec rm -rf '{}' \; Kind Regards Karen On 12/12/14 13:25, Akhil Das wrote: I'm not quiet sure whether spark will go inside subdirectories and pick up files from it. You could do something like following to bring all files to one directory. find . -iname '*' -exec mv '{}' . \; Thanks Best Regards On Fri, Dec 12, 2014 at 6:34 PM, Karen Murphy k.l.mur...@qub.ac.uk mailto:k.l.mur...@qub.ac.uk wrote: When I try to load a text file from a HDFS path using sc.wholeTextFiles(hdfs://localhost:54310/graphx/anywebsite.com/anywebsite.com/ http://anywebsite.com/anywebsite.com/) I'm get the following error: java.io.FileNotFoundException: Path is not a file: /graphx/anywebsite.com/anywebsite.com/css http://anywebsite.com/anywebsite.com/css (full stack trace at bottom of message). If I switch my Scala code to reading the input file from the local disk, wholeTextFiles doesn't pickup directories (such as css in this case) and there is no exception raised. The trace information in the 'local file' version shows that only plain text files are collected with sc.wholeTextFiles: 14/12/12 11:51:29 INFO WholeTextFileRDD: Input split: Paths:/tmp/anywebsite.com/anywebsite.com/index-2.html:0+6192,/tmp/anywebsite.com/anywebsite.com/gallery.html:0+3258,/tmp/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/tmp/anywebsite.com/anywebsite.com/jquery.html:0+326,/tmp/anywebsite.com/anywebsite.com/index.html:0+6174,/tmp/anywebsite.com/anywebsite.com/contact.html:0+3050,/tmp/anywebsite.com/anywebsite.com/archive.html:0+3247 http://anywebsite.com/anywebsite.com/index-2.html:0+6192,/tmp/anywebsite.com/anywebsite.com/gallery.html:0+3258,/tmp/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/tmp/anywebsite.com/anywebsite.com/jquery.html:0+326,/tmp/anywebsite.com/anywebsite.com/index.html:0+6174,/tmp/anywebsite.com/anywebsite.com/contact.html:0+3050,/tmp/anywebsite.com/anywebsite.com/archive.html:0+3247 Yet the trace information in the 'HDFS file' version shows directories too are collected with sc.wholeTextFiles: 14/12/12 11:49:07 INFO WholeTextFileRDD: Input split: Paths:/graphx/anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 http://anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 14/12/12 11:49:07 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: Path is not a file: /graphx/anywebsite.com/anywebsite.com/css http://anywebsite.com/anywebsite.com/css at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) Should the HDFS version behave the same as the local version of wholeTextFiles as far as the treatment of directories/non plain text files are concerned ? Any help, advice or workaround suggestions would be much appreciated, Thanks Karen VERSION INFO Ubuntu 14.04 Spark 1.1.1 Hadoop 2.5.2 Scala 2.10.4 FULL STACK TRACE 14/12/12 12:02:31 INFO WholeTextFileRDD: Input split:
Re: SchemaRDD partition on specific column values?
Hi Michael, I have opened following JIRA for the same :- https://issues.apache.org/jira/browse/SPARK-4849 I am having a look at the code to see what can be done and then we can have a discussion over the approach. Let me know if you have any comments/suggestions. Thanks -Nitin On Sun, Dec 14, 2014 at 2:53 PM, Michael Armbrust mich...@databricks.com wrote: I'm happy to discuss what it would take to make sure we can propagate this information correctly. Please open a JIRA (and mention me in it). Regarding including it in 1.2.1, it depends on how invasive the change ends up being, but it is certainly possible. On Thu, Dec 11, 2014 at 3:55 AM, nitin nitin2go...@gmail.com wrote: Can we take this as a performance improvement task in Spark-1.2.1? I can help contribute for this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards Nitin Goyal
Migrating Parquet inputs
Hi, is there an easy way to “migrate” parquet files or indicate optional values in sql statements? I added a couple of new fields that I also use in a schemaRDD.sql() which obviously fails for input files that don’t have the new fields. Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Why my SQL UDF cannot be registered?
As the error log shows, you may need to register it as: sqlContext.rgisterFunction(“toHour”, toHour _) The “_” means you are passing the function as parameter, not invoking it. Cheng Hao From: Xuelin Cao [mailto:xuelin...@yahoo.com.INVALID] Sent: Monday, December 15, 2014 5:28 PM To: User Subject: Why my SQL UDF cannot be registered? Hi, I tried to create a function that to convert an Unix time stamp to the hour number in a day. It works if the code is like this: sqlContext.registerFunction(toHour, (x:Long)={new java.util.Date(x*1000).getHours}) But, if I do it like this, it doesn't work: def toHour (x:Long) = {new java.util.Date(x*1000).getHours} sqlContext.registerFunction(toHour, toHour) The system reports an error: console:23: error: missing arguments for method toHour; follow this method with `_' if you want to treat it as a partially applied function sqlContext.registerFunction(toHour, toHour) Anyone can help on dealing with this error?
Re: java.lang.IllegalStateException: unread block data
When you say restored, does it mean the internal IP/public IP remain unchanged to you changed them accordingly? (I'm assuming you are using a cloud service like AWS, GCE or Azure). What is the serializer that you are using? Try to set the following before creating the sparkContext, might help with Serialization and all System.setProperty(spark.serializer, spark.KryoSerializer) System.setProperty(spark.kryo.registrator, com.sigmoidanalytics.MyRegistrator) Morbious wrote Hi, Recently I installed Cloudera Hadoop 5.1.1 with spark. I shut down slave servers and than restored them back. After this operation I was trying to run any task but each task with file bigger than few megabytes ended with errors: 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 61 (task 1.0:61) 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:140) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 62 (task 1.0:62) 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 1] 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 63 (task 1.0:63) 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 2] 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 64 (task 1.0:64) 14/12/12 20:25:02 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 3] 14/12/12 20:25:02 WARN scheduler.TaskSetManager: Lost TID 60 (task 1.0:60) I checked security limits but everything seems to be OK. Before restart I was able to use word count on 100GB file, now it can be done only on few mb file. Best regards, Morbious -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20684.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
is there a way to interact with Spark clusters remotely?
Hi experts I am wondering if there is a way to interactive with Spark remotely? i.e. no access to clusters required but submit Python/Scala scripts to cluster and get result based on (REST) APIs. That will facilitate the development process a lot.. Xiaoyong
Re: Pagerank implementation
Hiya, I too am looking for a PageRank solution in GraphX where the probabilities sum to 1. I tried a few modifications, including division by the total number of vertices in the first part of the equation, as well as trying to return full rank instead of delta (though not correctly as evident from exception at runtime). Tom did you manage to make a version which sums to 1 ? Could you possibly divulge the changes if so ? Also, I'm interested to know if the algorithm handles the case where there are no outgoing links from a node ? Does it avoid unfairness with sinks ? I'm new to Scala (and spark). Had a look at the code and don't see that it is, but could be missing something, Thanks Karen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pagerank-implementation-tp19013p20687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
integrating long-running Spark jobs with Thriftserver
Hi everybody, I apologize if the answer to my question is obvious but I haven't been able to find a straightforward solution anywhere on the internet. I have a number of Spark jobs written using the python API that do things like read in data from Amazon S3 to a main table in the Hive metastore, perform intensive calculations on that data to build derived/aggregated tables, etc. I also have Tableau set up to read those tables via the Spark Thriftserver. My question is how best to integrate those two sides of Spark. I want to have the Thriftserver constantly running so that Tableau can update its extracts on a scheduled basis and users can manually query those tables as needed, but I also need to run those python jobs on a scheduled basis as well. What's the best way to do that? The options I'm considering are as follows: 1. Simply call the python jobs via spark-submit, scheduled by cron. My concern here is concurrency issues if Tableau or a user tries to read from a table at the same time that a job is rebuilding/updating that table. To my understanding the Thriftserver is designed to handle concurrency, but Spark in general is not if two different Spark contexts are attempting to access the same data (as would be the case with this approach.) Am I correct in that thinking or is there actually no problem with this method? 2. Call the python jobs through the Spark Thriftserver so that the same Spark context is used. My question here is how to do that. I know one can call a python script as part of a HiveQL query using TRANSFORM, but that seems to be designed more for performing quick calculations on existing data as part of a query rather than building tables in the first place or calling long-running jobs that don't return anything (again, am I correct in this thinking or would this actually be a viable solution?) Is there a different way to call long-running Spark jobs via the Thriftserver? Are either of these good approaches or is there a better way that I'm missing? Thanks!
Serialize mllib's MatrixFactorizationModel
Hi all. I'm willing to serialize and later load a model trained using mllib's ALS. I've tried usign Java serialization with something like: val model = ALS.trainImplicit(training, rank, numIter, lambda, 1) val fos = new FileOutputStream(model.bin) val oos = new ObjectOutputStream(fos) oos.writeObject(bestModel.get) But when I try to deserialize it using: val fos = new FileInputStream(model.bin) val oos = new ObjectInputStream(fos) val model = oos.readObject().asInstanceOf[MatrixFactorizationModel] I get the error: Exception in thread main java.io.IOException: PARSING_ERROR(2) I've also tried to serialize MatrixFactorizationModel's both RDDs (products and users) and later create the MatrixFactorizationModel by hand passing the RDDs by constructor but I get an error cause its private: Error:(58, 17) constructor MatrixFactorizationModel in class MatrixFactorizationModel cannot be accessed in object RecommendALS val model = new MatrixFactorizationModel (8, userFeatures, productFeatures) Any ideas? Thanks! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException When Reading Avro Sequence Files
To me this looks like an internal error to the REPL. I am not sure what is causing that. Personally I never use the REPL, can you try typing up your program and running it from an IDE or spark-submit and see if you still get the same error? Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Mon, Dec 15, 2014 at 4:54 PM, Cristovao Jose Domingues Cordeiro cristovao.corde...@cern.ch wrote: Sure, thanks: warning: there were 1 deprecation warning(s); re-run with -deprecation for details java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) at org.apache.hadoop.mapreduce.Job.toString(Job.java:462) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337) at .init(console:10) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:846) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1119) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:672) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:703) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:667) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:819) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:864) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:776) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:619) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:627) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:632) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:959) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:907) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:907) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1002) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Could something you omitted in your snippet be chaining this exception? Cumprimentos / Best regards, Cristóvão José Domingues Cordeiro IT Department - 28/R-018 CERN -- *From:* Simone Franzini [captainfr...@gmail.com] *Sent:* 15 December 2014 16:52 *To:* Cristovao Jose Domingues Cordeiro *Subject:* Re: NullPointerException When Reading Avro Sequence Files Ok, I have no idea what that is. That appears to be an internal Spark exception. Maybe if you can post the entire stack trace it would give some more details to understand what is going on. Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Mon, Dec 15, 2014 at 4:50 PM, Cristovao Jose Domingues Cordeiro cristovao.corde...@cern.ch wrote: Hi, thanks for that. But yeah the 2nd line is an exception. jobread is not created. Cumprimentos / Best regards, Cristóvão José Domingues Cordeiro IT Department - 28/R-018 CERN -- *From:* Simone Franzini [captainfr...@gmail.com] *Sent:* 15 December 2014 16:39 *To:* Cristovao Jose Domingues Cordeiro *Subject:* Re: NullPointerException When Reading Avro Sequence Files I did not mention the imports needed in my code. I think these are all of them: import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import
Re: spark kafka batch integration
For an alternative take on a similar idea, see https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka An advantage of the approach I'm taking is that the lower and upper offsets of the RDD are known in advance, so it's deterministic. I haven't had a need to write to kafka from spark yet, so that's an obvious advantage of your library. I think the existing kafka dstream is inadequate for a number of use cases, and would really like to see some combination of these approaches make it into the spark codebase. On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers ko...@tresata.com wrote: hello all, we at tresata wrote a library to provide for batch integration between spark and kafka (distributed write of rdd to kafa, distributed read of rdd from kafka). our main use cases are (in lambda architecture jargon): * period appends to the immutable master dataset on hdfs from kafka using spark * make non-streaming data available in kafka with periodic data drops from hdfs using spark. this is to facilitate merging the speed and batch layer in spark-streaming * distributed writes from spark-streaming see here: https://github.com/tresata/spark-kafka best, koert
Intermittent test failures
Hi, I’m seeing strange, random errors when running unit tests for my Spark jobs. In this particular case I’m using Spark SQL to read and write Parquet files, and one error that I keep running into is this one: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) I can only prevent this from happening by using isolated Specs tests thats always create a new SparkContext that is not shared between tests (but there can also be only a single SparkContext per test), and also by using standard SQLContext instead of HiveContext. It does not seem to have anything to do with the actual files that I also create during the test run with SQLContext.saveAsParquetFile. Cheers - Marius PS The full trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232) org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927) org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155) sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) ~[spark-core_2.10-1.1.1.jar:1.1.1] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) ~[spark-core_2.10-1.1.1.jar:1.1.1] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) ~[spark-core_2.10-1.1.1.jar:1.1.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library.jar:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library.jar:na] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) ~[spark-core_2.10-1.1.1.jar:1.1.1] - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark kafka batch integration
thanks! i will take a look at your code. didn't realize there was already something out there. good point about upper offsets, i will add that feature to our version as well if you dont mind. i was thinking about making it deterministic for task failure transparently (even if no upper offsets are provided) by doing a call to get the latest offsets for all partitions, and filter the rdd based on that to make sure nothing beyond those offsets ends up in the rdd. havent had time to test if that works and is robust. On Mon, Dec 15, 2014 at 11:39 AM, Cody Koeninger c...@koeninger.org wrote: For an alternative take on a similar idea, see https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka An advantage of the approach I'm taking is that the lower and upper offsets of the RDD are known in advance, so it's deterministic. I haven't had a need to write to kafka from spark yet, so that's an obvious advantage of your library. I think the existing kafka dstream is inadequate for a number of use cases, and would really like to see some combination of these approaches make it into the spark codebase. On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers ko...@tresata.com wrote: hello all, we at tresata wrote a library to provide for batch integration between spark and kafka (distributed write of rdd to kafa, distributed read of rdd from kafka). our main use cases are (in lambda architecture jargon): * period appends to the immutable master dataset on hdfs from kafka using spark * make non-streaming data available in kafka with periodic data drops from hdfs using spark. this is to facilitate merging the speed and batch layer in spark-streaming * distributed writes from spark-streaming see here: https://github.com/tresata/spark-kafka best, koert
Re: Serialize mllib's MatrixFactorizationModel
This class is not going to be serializable, as it contains huge RDDs. Even if the right constructor existed the RDDs inside would not serialize. On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà alber...@eml.cc wrote: Hi all. I'm willing to serialize and later load a model trained using mllib's ALS. I've tried usign Java serialization with something like: val model = ALS.trainImplicit(training, rank, numIter, lambda, 1) val fos = new FileOutputStream(model.bin) val oos = new ObjectOutputStream(fos) oos.writeObject(bestModel.get) But when I try to deserialize it using: val fos = new FileInputStream(model.bin) val oos = new ObjectInputStream(fos) val model = oos.readObject().asInstanceOf[MatrixFactorizationModel] I get the error: Exception in thread main java.io.IOException: PARSING_ERROR(2) I've also tried to serialize MatrixFactorizationModel's both RDDs (products and users) and later create the MatrixFactorizationModel by hand passing the RDDs by constructor but I get an error cause its private: Error:(58, 17) constructor MatrixFactorizationModel in class MatrixFactorizationModel cannot be accessed in object RecommendALS val model = new MatrixFactorizationModel (8, userFeatures, productFeatures) Any ideas? Thanks! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: is there a way to interact with Spark clusters remotely?
Hi Xiaoyong, You could refer this post if you are looking on how to run spark jobs remotely http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html You will of course require network access to the cluster. Thanks Best Regards On Mon, Dec 15, 2014 at 7:47 PM, Xiaoyong Zhu xiaoy...@microsoft.com wrote: Hi experts I am wondering if there is a way to interactive with Spark remotely? i.e. no access to clusters required but submit Python/Scala scripts to cluster and get result based on (REST) APIs. That will facilitate the development process a lot.. Xiaoyong
Re: Serialize mllib's MatrixFactorizationModel
In that case, what is the strategy to train a model in some background batch process and make recommendations for some other service in real time? Run both processes in the same spark cluster? Thanks. -- Albert Manyà alber...@eml.cc On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote: This class is not going to be serializable, as it contains huge RDDs. Even if the right constructor existed the RDDs inside would not serialize. On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà alber...@eml.cc wrote: Hi all. I'm willing to serialize and later load a model trained using mllib's ALS. I've tried usign Java serialization with something like: val model = ALS.trainImplicit(training, rank, numIter, lambda, 1) val fos = new FileOutputStream(model.bin) val oos = new ObjectOutputStream(fos) oos.writeObject(bestModel.get) But when I try to deserialize it using: val fos = new FileInputStream(model.bin) val oos = new ObjectInputStream(fos) val model = oos.readObject().asInstanceOf[MatrixFactorizationModel] I get the error: Exception in thread main java.io.IOException: PARSING_ERROR(2) I've also tried to serialize MatrixFactorizationModel's both RDDs (products and users) and later create the MatrixFactorizationModel by hand passing the RDDs by constructor but I get an error cause its private: Error:(58, 17) constructor MatrixFactorizationModel in class MatrixFactorizationModel cannot be accessed in object RecommendALS val model = new MatrixFactorizationModel (8, userFeatures, productFeatures) Any ideas? Thanks! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: is there a way to interact with Spark clusters remotely?
Have you seen the recent announcement around Spark Kernel using IPython/0MQ protocol ? https://github.com/ibm-et/spark-kernel On Mon, Dec 15, 2014 at 12:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Xiaoyong, You could refer this post if you are looking on how to run spark jobs remotely http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html You will of course require network access to the cluster. Thanks Best Regards On Mon, Dec 15, 2014 at 7:47 PM, Xiaoyong Zhu xiaoy...@microsoft.com wrote: Hi experts I am wondering if there is a way to interactive with Spark remotely? i.e. no access to clusters required but submit Python/Scala scripts to cluster and get result based on (REST) APIs. That will facilitate the development process a lot.. Xiaoyong -- François /fly Le Lay - @lelayf Data Engineering Chapter Lead IO Tribe NYC Phone : +1 (646)-656-0075
Re: MLLIB model export: PMML vs MLLIB serialization
Thanks Vincenzo. Are you trying out all the models implemented in mllib? Actually I don't see decision tree there. Sorry if I missed it. When are you planning to merge this to spark branch? Thanks Sourabh On Sun, Dec 14, 2014 at 5:54 PM, selvinsource [via Apache Spark User List] ml-node+s1001560n20674...@n3.nabble.com wrote: Hi Sourabh, have a look at https://issues.apache.org/jira/browse/SPARK-1406, I am looking into exporting models in PMML using JPMML. Regards, Vincenzo -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20674.html To unsubscribe from MLLIB model export: PMML vs MLLIB serialization, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=20324code=Y2hha2kuc291cmFiaEBnbWFpbC5jb218MjAzMjR8LTY5MzQzMTU5OQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20688.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Serialize mllib's MatrixFactorizationModel
Hi Albert, There is some discussion going on here: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tc20324.html#a20674 I am also looking for this solution.But looks like until mllib pmml export is ready, there is no full proof solution to export the mllib trained model to a different system. Thanks Sourabh On Mon, Dec 15, 2014 at 10:39 PM, Albert Manyà alber...@eml.cc wrote: In that case, what is the strategy to train a model in some background batch process and make recommendations for some other service in real time? Run both processes in the same spark cluster? Thanks. -- Albert Manyà alber...@eml.cc On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote: This class is not going to be serializable, as it contains huge RDDs. Even if the right constructor existed the RDDs inside would not serialize. On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà alber...@eml.cc wrote: Hi all. I'm willing to serialize and later load a model trained using mllib's ALS. I've tried usign Java serialization with something like: val model = ALS.trainImplicit(training, rank, numIter, lambda, 1) val fos = new FileOutputStream(model.bin) val oos = new ObjectOutputStream(fos) oos.writeObject(bestModel.get) But when I try to deserialize it using: val fos = new FileInputStream(model.bin) val oos = new ObjectInputStream(fos) val model = oos.readObject().asInstanceOf[MatrixFactorizationModel] I get the error: Exception in thread main java.io.IOException: PARSING_ERROR(2) I've also tried to serialize MatrixFactorizationModel's both RDDs (products and users) and later create the MatrixFactorizationModel by hand passing the RDDs by constructor but I get an error cause its private: Error:(58, 17) constructor MatrixFactorizationModel in class MatrixFactorizationModel cannot be accessed in object RecommendALS val model = new MatrixFactorizationModel (8, userFeatures, productFeatures) Any ideas? Thanks! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialize mllib's MatrixFactorizationModel
The thing about MatrixFactorizationModel, compared to other models, is that it is huge. It's not just a few coefficients, but whole RDDs of coefficients. I think you could save these RDDs of user/product factors to persistent storage, load them, then recreate the MatrixFactorizationModel that way. It's a bit manual, but works. This is probably why there is no standard PMML representation for this type of model. It is different from classic regression/classification models, and too big for XML. So efforts to export/import PMML are not relevant IMHO. On Mon, Dec 15, 2014 at 5:09 PM, Albert Manyà alber...@eml.cc wrote: In that case, what is the strategy to train a model in some background batch process and make recommendations for some other service in real time? Run both processes in the same spark cluster? Thanks. -- Albert Manyà alber...@eml.cc On Mon, Dec 15, 2014, at 05:58 PM, Sean Owen wrote: This class is not going to be serializable, as it contains huge RDDs. Even if the right constructor existed the RDDs inside would not serialize. On Mon, Dec 15, 2014 at 4:33 PM, Albert Manyà alber...@eml.cc wrote: Hi all. I'm willing to serialize and later load a model trained using mllib's ALS. I've tried usign Java serialization with something like: val model = ALS.trainImplicit(training, rank, numIter, lambda, 1) val fos = new FileOutputStream(model.bin) val oos = new ObjectOutputStream(fos) oos.writeObject(bestModel.get) But when I try to deserialize it using: val fos = new FileInputStream(model.bin) val oos = new ObjectInputStream(fos) val model = oos.readObject().asInstanceOf[MatrixFactorizationModel] I get the error: Exception in thread main java.io.IOException: PARSING_ERROR(2) I've also tried to serialize MatrixFactorizationModel's both RDDs (products and users) and later create the MatrixFactorizationModel by hand passing the RDDs by constructor but I get an error cause its private: Error:(58, 17) constructor MatrixFactorizationModel in class MatrixFactorizationModel cannot be accessed in object RecommendALS val model = new MatrixFactorizationModel (8, userFeatures, productFeatures) Any ideas? Thanks! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom UDTF with Lateral View throws ClassNotFound exception in Spark SQL CLI
Hello, I met a problem when using Spark sql CLI. A custom UDTF with lateral view throws ClassNotFound exception. I did a couple of experiments in same environment (spark version 1.1.1): select + same custom UDTF (Passed) select + lateral view + custom UDTF (ClassNotFoundException) select + lateral view + built-in UDTF (Passed) I have done some googling there days and found one related issue ticket of Spark https://issues.apache.org/jira/browse/SPARK-4811 which is about Custom UDTFs not working in Spark SQL. It should be helpful to put actual code here to reproduce the problem. However, corporate regulations might prohibit this. So sorry about this. Directly using explode's source code in a jar will help anyway. Here is a portion of stack print when exception, just in case: java.lang.ClassNotFoundException: XXX at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.sql.hive.HiveFunctionFactory$class.createFunction(hiveUdfs.scala:81) at org.apache.spark.sql.hive.HiveGenericUdtf.createFunction(hiveUdfs.scala:247) at org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:254) at org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:254) at org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors$lzycompute(hiveUdfs.scala:261) at org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors(hiveUdfs.scala:260) at org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes$lzycompute(hiveUdfs.scala:265) at org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes(hiveUdfs.scala:265) at org.apache.spark.sql.hive.HiveGenericUdtf.makeOutput(hiveUdfs.scala:269) at org.apache.spark.sql.catalyst.expressions.Generator.output(generators.scala:60) at org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50) at org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.catalyst.plans.logical.Generate.generatorOutput(basicOperators.scala:50) at org.apache.spark.sql.catalyst.plans.logical.Generate.output(basicOperators.scala:60) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) the rest is omitted. Thank you. Shenghua -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDTF-with-Lateral-View-throws-ClassNotFound-exception-in-Spark-SQL-CLI-tp20689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: custom spark app name in yarn-cluster mode
Thanks Sandy, passing --name works fine :) Tomer On Fri, Dec 12, 2014 at 9:35 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Tomer, In yarn-cluster mode, the application has already been submitted to YARN by the time the SparkContext is created, so it's too late to set the app name there. I believe giving it with the --name property to spark-submit should work. -Sandy On Thu, Dec 11, 2014 at 10:28 AM, Tomer Benyamini tomer@gmail.com wrote: On Thu, Dec 11, 2014 at 8:27 PM, Tomer Benyamini tomer@gmail.com wrote: Hi, I'm trying to set a custom spark app name when running a java spark app in yarn-cluster mode. SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(System.getProperty(spark.master)); sparkConf.setAppName(myCustomName); sparkConf.set(spark.logConf, true); JavaSparkContext sc = new JavaSparkContext(sparkConf); Apparently this only works when running in yarn-client mode; in yarn-cluster mode the app name is the class name, when viewing the app in the cluster manager UI. Any idea? Thanks, Tomer
Accessing rows of a row in Spark
Hi spark users, Do you know how to access rows of row? I have a SchemaRDD called user and register it as a table with the following schema: root |-- user_id: string (nullable = true) |-- item: array (nullable = true) ||-- element: struct (containsNull = false) |||-- item_id: string (nullable = true) |||-- name: string (nullable = true) val items=sqlContext.sql(select items from user where user_id = 1).first The type of items is org.apache.spark.sql.Row. I want to iterate through the items and count how many items that user_id = 1 has. I could not find a method in which I can do that. The farthest I can get to is to convert items.toSeq. The type information I got back is: scala items.toSeq res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])] Any suggestion? Best Regards, Jerry
Re: JSON Input files
Underneath the covers, jsonFile uses TextInputFormat, which will split files correctly based on new lines. Thus, there is no fixed maximum size for a json object (other than the fact that it must fit into memory on the executors). On Mon, Dec 15, 2014 at 7:22 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Peter, Thank you for the clarification. Now we need to store each JSON object into one line. Is there any limitation of length of JSON object? So, JSON object will not go to the next line. What will happen if JSON object is a big/huge one? Will it store in a single line in HDFS? What will happen, if JSON object contains BLOB/CLOB value? Is this entire JSON object stores in single line of HDFS? What will happen, if JSON object exceeding the HDFS block size. For example, single JSON object split into two different worker nodes. In this case, How Spark will read this JSON object? Could you please clarify above questions Regards, Rajesh On Mon, Dec 15, 2014 at 6:52 PM, Peter Vandenabeele pe...@vandenabeele.com wrote: On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson helena.edel...@datastax.com wrote: One solution can be found here: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets As far as I understand, the people.json file is not really a proper json file, but a file documented as: ... JSON files where each line of the files is a JSON object.. This means that is a file with multiple lines, but each line needs to have a fully self-contained JSON object (initially confusing, this will not parse a standard multi-line JSON file). We are working to clarify this in https://github.com/apache/spark/pull/3517 HTH, Peter - Helena @helenaedelson On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, I have a large JSON file in Hadoop. Could you please let me know 1. How to read the JSON file 2. How to parse the JSON file Please share any example program based on Scala Regards, Rajesh -- Peter Vandenabeele http://www.allthingsdata.io http://www.linkedin.com/in/petervandenabeele https://twitter.com/peter_v gsm: +32-478-27.40.69 e-mail: pe...@vandenabeele.com skype: peter_v_be
Re: Custom UDTF with Lateral View throws ClassNotFound exception in Spark SQL CLI
Can you add this information to the JIRA? On Mon, Dec 15, 2014 at 10:54 AM, shenghua wansheng...@gmail.com wrote: Hello, I met a problem when using Spark sql CLI. A custom UDTF with lateral view throws ClassNotFound exception. I did a couple of experiments in same environment (spark version 1.1.1): select + same custom UDTF (Passed) select + lateral view + custom UDTF (ClassNotFoundException) select + lateral view + built-in UDTF (Passed) I have done some googling there days and found one related issue ticket of Spark https://issues.apache.org/jira/browse/SPARK-4811 which is about Custom UDTFs not working in Spark SQL. It should be helpful to put actual code here to reproduce the problem. However, corporate regulations might prohibit this. So sorry about this. Directly using explode's source code in a jar will help anyway. Here is a portion of stack print when exception, just in case: java.lang.ClassNotFoundException: XXX at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.sql.hive.HiveFunctionFactory$class.createFunction(hiveUdfs.scala:81) at org.apache.spark.sql.hive.HiveGenericUdtf.createFunction(hiveUdfs.scala:247) at org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:254) at org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:254) at org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors$lzycompute(hiveUdfs.scala:261) at org.apache.spark.sql.hive.HiveGenericUdtf.outputInspectors(hiveUdfs.scala:260) at org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes$lzycompute(hiveUdfs.scala:265) at org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes(hiveUdfs.scala:265) at org.apache.spark.sql.hive.HiveGenericUdtf.makeOutput(hiveUdfs.scala:269) at org.apache.spark.sql.catalyst.expressions.Generator.output(generators.scala:60) at org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50) at org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.catalyst.plans.logical.Generate.generatorOutput(basicOperators.scala:50) at org.apache.spark.sql.catalyst.plans.logical.Generate.output(basicOperators.scala:60) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:79) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) the rest is omitted. Thank you. Shenghua -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDTF-with-Lateral-View-throws-ClassNotFound-exception-in-Spark-SQL-CLI-tp20689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing rows of a row in Spark
scala val items = Row(1 - orange, 2 - apple) items: org.apache.spark.sql.catalyst.expressions.Row = [(1,orange),(2,apple)] If you literally want an iterator, then this: scala items.toIterator.count { case (user_id, name) = user_id == 1 } res0: Int = 1 ...else: scala items.count { case (user_id, name) = user_id == 1 } res1: Int = 1 On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to access rows of row? I have a SchemaRDD called user and register it as a table with the following schema: root |-- user_id: string (nullable = true) |-- item: array (nullable = true) ||-- element: struct (containsNull = false) |||-- item_id: string (nullable = true) |||-- name: string (nullable = true) val items=sqlContext.sql(select items from user where user_id = 1).first The type of items is org.apache.spark.sql.Row. I want to iterate through the items and count how many items that user_id = 1 has. I could not find a method in which I can do that. The farthest I can get to is to convert items.toSeq. The type information I got back is: scala items.toSeq res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])] Any suggestion? Best Regards, Jerry
Re: Intermittent test failures
Is it possible that you are starting more than one SparkContext in a single JVM with out stopping previous ones? I'd try testing with Spark 1.2, which will throw an exception in this case. On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier mps@gmail.com wrote: Hi, I’m seeing strange, random errors when running unit tests for my Spark jobs. In this particular case I’m using Spark SQL to read and write Parquet files, and one error that I keep running into is this one: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) I can only prevent this from happening by using isolated Specs tests thats always create a new SparkContext that is not shared between tests (but there can also be only a single SparkContext per test), and also by using standard SQLContext instead of HiveContext. It does not seem to have anything to do with the actual files that I also create during the test run with SQLContext.saveAsParquetFile. Cheers - Marius PS The full trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232) org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927) org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155) sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) ~[spark-core_2.10-1.1.1.jar:1.1.1] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) ~[spark-core_2.10-1.1.1.jar:1.1.1] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) ~[spark-core_2.10-1.1.1.jar:1.1.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library.jar:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library.jar:na] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) ~[spark-core_2.10-1.1.1.jar:1.1.1] - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark metrics for ganglia
Thanks tsingfu, I used this configuration based in your post: (with ganglia unicast mode) # Enable GangliaSink for all instances *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink *.sink.ganglia.host=10.0.0.7 *.sink.ganglia.port=8649 *.sink.ganglia.period=15 *.sink.ganglia.unit=seconds *.sink.ganglia.ttl=1 *.sink.ganglia.mode=unicast Then, I have the following error now. ERROR metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.GangliaSink cannot be instantialized java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.GangliaSink -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-metrics-for-ganglia-tp14335p20690.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: printing mllib.linalg.vector
you can use the default toString method to get the string representation. if you want to customized, check the indices/values fields. -Xiangrui On Fri, Dec 5, 2014 at 7:32 AM, debbie debbielarso...@hotmail.com wrote: Basic question: What is the best way to loop through one of these and print their components? Convert them to an array? Thanks Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing rows of a row in Spark
Hi Mark, Thank you for helping out. The items I got back from Spark SQL has the type information as follows: scala items res16: org.apache.spark.sql.Row = [WrappedArray([1,orange],[2,apple])] I tried to iterate the items as you suggested but no luck. Best Regards, Jerry On Mon, Dec 15, 2014 at 2:18 PM, Mark Hamstra m...@clearstorydata.com wrote: scala val items = Row(1 - orange, 2 - apple) items: org.apache.spark.sql.catalyst.expressions.Row = [(1,orange),(2,apple)] If you literally want an iterator, then this: scala items.toIterator.count { case (user_id, name) = user_id == 1 } res0: Int = 1 ...else: scala items.count { case (user_id, name) = user_id == 1 } res1: Int = 1 On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to access rows of row? I have a SchemaRDD called user and register it as a table with the following schema: root |-- user_id: string (nullable = true) |-- item: array (nullable = true) ||-- element: struct (containsNull = false) |||-- item_id: string (nullable = true) |||-- name: string (nullable = true) val items=sqlContext.sql(select items from user where user_id = 1).first The type of items is org.apache.spark.sql.Row. I want to iterate through the items and count how many items that user_id = 1 has. I could not find a method in which I can do that. The farthest I can get to is to convert items.toSeq. The type information I got back is: scala items.toSeq res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])] Any suggestion? Best Regards, Jerry
Re: MLlib(Logistic Regression) + Spark Streaming.
If you want to train offline and predict online, you can use the current LR implementation to train a model and then apply model.predict on the dstream. -Xiangrui On Sun, Dec 7, 2014 at 6:30 PM, Nasir Khan nasirkhan.onl...@gmail.com wrote: I am new to spark. Lets say i want to develop a machine learning model. which trained on normal method in MLlib. I want to use that model with classifier Logistic regression and predict the streaming data coming from a file or socket. Streaming data - Logistic Regression - binary label prediction. Is it possible? since there is no streaming logistic regression algo like streaming linear regression. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Logistic-Regression-Spark-Streaming-tp20564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLIb: Linear regression: Loss was due to java.lang.ArrayIndexOutOfBoundsException
Is it possible that after filtering the feature dimension changed? This may happen if you use LIBSVM format but didn't specify the number of features. -Xiangrui On Tue, Dec 9, 2014 at 4:54 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I was able to run LinearRegressionwithSGD for a largeer dataset ( 2GB sparse). I have now filtered the data and I am running regression on a subset of it (~ 200 MB). I see this error, which is strange since it was running fine with the superset data. Is this a formatting issue (which I doubt) or is this some other issue in data preparation? I confirmed that there is no empty line in my dataset. Any help with this will be highly appreciated. 14/12/08 20:32:03 WARN TaskSetManager: Lost TID 5 (task 3.0:1) 14/12/08 20:32:03 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 150323 at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:231) at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:216) at breeze.linalg.operators.BinaryRegistry$class.apply(BinaryOp.scala:60) at breeze.linalg.VectorOps$$anon$178.apply(Vector.scala:391) at breeze.linalg.NumericOps$class.dot(NumericOps.scala:83) at breeze.linalg.DenseVector.dot(DenseVector.scala:47) at org.apache.spark.mllib.optimization.LeastSquaresGradient.compute(Gradient.scala:125) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:180) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:179) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why KMeans with mllib is so slow ?
Please check the number of partitions after sc.textFile. Use sc.textFile('...', 8) to have at least 8 partitions. -Xiangrui On Tue, Dec 9, 2014 at 4:58 AM, DB Tsai dbt...@dbtsai.com wrote: You just need to use the latest master code without any configuration to get performance improvement from my PR. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Dec 8, 2014 at 7:53 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: After some investigation, I learned that I can't compare kmeans in mllib with another kmeans implementation directly. The kmeans|| initialization step takes more time than the algorithm implemented in julia for example. There is also the ability to run multiple runs of kmeans algorithm in mllib even by default the number of runs is 1. DB Tsai can you please tell me the configuration you took for the improvement you mention in your pull request. I'd like to run the same benchmark on mnist8m on my computer. Cheers; On Fri, Dec 5, 2014 at 10:34 PM, DB Tsai dbt...@dbtsai.com wrote: Also, are you using the latest master in this experiment? A PR merged into the master couple days ago will spend up the k-means three times. See https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: The code is really simple : object TestKMeans { def main(args: Array[String]) { val conf = new SparkConf() .setAppName(Test KMeans) .setMaster(local[8]) .set(spark.executor.memory, 8g) val sc = new SparkContext(conf) val numClusters = 500; val numIterations = 2; val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble))) data.cache() val clusters = KMeans.train(data, numClusters, numIterations) println(clusters.clusterCenters.size) val wssse = clusters.computeCost(data) println(serror : $wssse) } } For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8 minutes on a single core. Solving the same problem with spark kmean|| take more than 1.5 hours with 8 cores Either they don't implement the same algorithm either I don't understand how the kmeans in spark works. Is my data not big enough to take full advantage of spark ? At least, I expect to the same runtime. Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stack overflow Error while executing spark SQL
Could you post the full stacktrace? It seems to be some recursive call in parsing. -Xiangrui On Tue, Dec 9, 2014 at 7:44 PM, jishnu.prat...@wipro.com wrote: Hi I am getting Stack overflow Error Exception in main java.lang.stackoverflowerror scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) while executing the following code sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) The complete code is from github https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/ExamineAndTrain.scala import com.google.gson.{GsonBuilder, JsonParser} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans /** * Examine the collected tweets and trains a model based on them. */ object ExamineAndTrain { val jsonParser = new JsonParser() val gson = new GsonBuilder().setPrettyPrinting().create() def main(args: Array[String]) { // Process program arguments and set properties /*if (args.length 3) { System.err.println(Usage: + this.getClass.getSimpleName + tweetInput outputModelDir numClusters numIterations) System.exit(1) } * */ val outputModelDir=C:\\MLModel val tweetInput=C:\\MLInput val numClusters=10 val numIterations=20 //val Array(tweetInput, outputModelDir, Utils.IntParam(numClusters), Utils.IntParam(numIterations)) = args val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // Pretty print some of the tweets. val tweets = sc.textFile(tweetInput) println(Sample JSON Tweets---) for (tweet - tweets.take(5)) { println(gson.toJson(jsonParser.parse(tweet))) } val tweetTable = sqlContext.jsonFile(tweetInput).cache() tweetTable.registerTempTable(tweetTable) println(--Tweet table Schema---) tweetTable.printSchema() println(Sample Tweet Text-) sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) println(--Sample Lang, Name, text---) sqlContext.sql(SELECT user.lang, user.name, text FROM tweetTable LIMIT 1000).collect().foreach(println) println(--Total count by languages Lang, count(*)---) sqlContext.sql(SELECT user.lang, COUNT(*) as cnt FROM tweetTable GROUP BY user.lang ORDER BY cnt DESC LIMIT 25).collect.foreach(println) println(--- Training the model and persist it) val texts = sqlContext.sql(SELECT text from tweetTable).map(_.head.toString) // Cache the vectors RDD since it will be used for all the KMeans iterations. val vectors = texts.map(Utils.featurize).cache() vectors.count() // Calls an action on the RDD to populate the vectors cache. val model = KMeans.train(vectors, numClusters, numIterations) sc.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(outputModelDir) val some_tweets = texts.take(100) println(Example tweets from the clusters) for (i - 0 until numClusters) { println(s\nCLUSTER $i:) some_tweets.foreach { t = if (model.predict(Utils.featurize(t)) == i) { println(t) } } } } } Thanks Regards Jishnu Menath Prathap - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing rows of a row in Spark
Looks like you've got one more layer of containment than you intend -- i.e. you've got Row[WrappedArray[Row[(Int, String)]] where you want Row[Row[(Int, String)]]. That's easy to do if somewhere along the line you did something like `val row = Row(collection)` instead of `val row = Row.fromSeq(collection)`. On Mon, Dec 15, 2014 at 11:47 AM, Jerry Lam chiling...@gmail.com wrote: Hi Mark, Thank you for helping out. The items I got back from Spark SQL has the type information as follows: scala items res16: org.apache.spark.sql.Row = [WrappedArray([1,orange],[2,apple])] I tried to iterate the items as you suggested but no luck. Best Regards, Jerry On Mon, Dec 15, 2014 at 2:18 PM, Mark Hamstra m...@clearstorydata.com wrote: scala val items = Row(1 - orange, 2 - apple) items: org.apache.spark.sql.catalyst.expressions.Row = [(1,orange),(2,apple)] If you literally want an iterator, then this: scala items.toIterator.count { case (user_id, name) = user_id == 1 } res0: Int = 1 ...else: scala items.count { case (user_id, name) = user_id == 1 } res1: Int = 1 On Mon, Dec 15, 2014 at 11:04 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to access rows of row? I have a SchemaRDD called user and register it as a table with the following schema: root |-- user_id: string (nullable = true) |-- item: array (nullable = true) ||-- element: struct (containsNull = false) |||-- item_id: string (nullable = true) |||-- name: string (nullable = true) val items=sqlContext.sql(select items from user where user_id = 1).first The type of items is org.apache.spark.sql.Row. I want to iterate through the items and count how many items that user_id = 1 has. I could not find a method in which I can do that. The farthest I can get to is to convert items.toSeq. The type information I got back is: scala items.toSeq res57: Seq[Any] = [WrappedArray([1,orange],[2,apple])] Any suggestion? Best Regards, Jerry
Re: what is the best way to implement mini batches?
I'm a little confused by some of the responses. It seems like there are two different issues being discussed here: 1. How to turn a sequential algorithm into something that works on spark. Eg deal with the fact that data is split into partitions which are processed in parallel (though within a partition, data is processed sequentially). I'm guessing folks are particularly interested in online machine learning algos, which often have a point update and a mini batch update. 2. How to convert a one-point-at-a-time view of the data and convert it into a mini batches view of the data. (2) is pretty straightforward, eg with iterator.grouped (batchSize), or manually put data into your own buffer etc. This works for creating mini batches *within* one partition in the context of spark. But problem (1) is completely separate, and there is no general solution. It really depends the specifics of what you're trying to do. Some of the suggestions on this thread seem like they are basically just falling back to sequential data processing ... but reay inefficient sequential processing. Eg. It doesn't make sense to do a full scan of your data with spark, and ignore all the records but the few that are in the next mini batch. It's completely reasonable to just sequentially process all the data if that works for you. But then it doesn't make sense to use spark, you're not gaining anything from it. Hope this helps, apologies if I just misunderstood the other suggested solutions. On Dec 14, 2014 8:35 PM, Earthson earthson...@gmail.com wrote: I think it could be done like: 1. using mapPartition to randomly drop some partition 2. drop some elements randomly(for selected partition) 3. calculate gradient step for selected elements I don't think fixed step is needed, but fixed step could be done: 1. zipWithIndex 2. create ShuffleRDD based on the index(eg. using index/10 as key) 3. using mapPartition to calculate each bach I also have a question: Can mini batches run in parallel? I think parallel all batches just like a full batch GD in some case. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Desktop application for ALS-MlLib/ Training ALS
On Sun, Dec 14, 2014 at 3:06 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, I am a new bee in spark and scala world I have been trying to implement Collaborative filtering using MlLib supplied out of the box with Spark and Scala I have 2 problems 1. The best model was trained with rank = 20 and lambda = 5.0, and numIter = 10, and its RMSE on the test set is 25.718710831912485. The best model improves the baseline by 18.29%. Is there a scientific way in which RMSE could be brought down? What is a descent acceptable value for RMSE? The grid search approach used in the AMPCamp tutorial is pretty standard. Whether an RMSE is good or not really depends on your dataset. 2. I picked up the Collaborative filtering algorithm from http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html and executed the given code with my dataset. Now, I want to build a desktop application around it. a. What is the best language to do this Java/ Scala? Any possibility to do this using C#? We support Java/Scala/Python. Start with the one your are most familiar with. C# is not supported. b. Can somebody please share any relevant documents/ source or any helper links to help me get started on this? For ALS, you can check the API documentation. Your help is greatly appreciated Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS failure with size Integer.MAX_VALUE
Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui, The block size limit was encountered even with reduced number of item blocks as you had expected. I'm wondering if I could try the new implementation as a standalone library against a 1.1 deployment. Does it have dependencies on any core API's in the current master? Thanks, Bharath On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Desktop application for ALS-MlLib/ Training ALS
In case you must write c# code, you can call python code from c# or use IronPython. :) On Mon, Dec 15, 2014 at 12:04 PM, Xiangrui Meng men...@gmail.com wrote: On Sun, Dec 14, 2014 at 3:06 AM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Hi, I am a new bee in spark and scala world I have been trying to implement Collaborative filtering using MlLib supplied out of the box with Spark and Scala I have 2 problems 1. The best model was trained with rank = 20 and lambda = 5.0, and numIter = 10, and its RMSE on the test set is 25.718710831912485. The best model improves the baseline by 18.29%. Is there a scientific way in which RMSE could be brought down? What is a descent acceptable value for RMSE? The grid search approach used in the AMPCamp tutorial is pretty standard. Whether an RMSE is good or not really depends on your dataset. 2. I picked up the Collaborative filtering algorithm from http://ampcamp.berkeley.edu/5/exercises/movie-recommendation-with-mllib.html and executed the given code with my dataset. Now, I want to build a desktop application around it. a. What is the best language to do this Java/ Scala? Any possibility to do this using C#? We support Java/Scala/Python. Start with the one your are most familiar with. C# is not supported. b. Can somebody please share any relevant documents/ source or any helper links to help me get started on this? For ALS, you can check the API documentation. Your help is greatly appreciated Thanks!! Regards, Saurabh Agrawal This e-mail, including accompanying communications and attachments, is strictly confidential and only for the intended recipient. Any retention, use or disclosure not expressly authorised by Markit is prohibited. This email is subject to all waivers and other terms at the following link: http://www.markit.com/en/about/legal/email-disclaimer.page Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information on our offices worldwide. MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority with registration number 207294 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Abhi Basu
Re: java.lang.IllegalStateException: unread block data
Restored ment reboot slave node with unchanged IP. Funny thing is that for small files spark works fine. I checked hadoop with hdfs also and I'm able to run wordcount on it without any problems (i.e. file about 50GB size). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR YarnClientClusterScheduler: Lost executor Akka client disassociated
Hi Muhammad, Maybe next time you can use http://pastebin.com/ to format and paste the cleaner scala code snippet so other can help you easier. Also, please only paste the significant portion of stack-trace which causes the issue instead of giant logs. First of all, In your log, it seems that you run out of memory, and I guess the problem is you are trying to cache the whole `clickstreamRDD`. Since you are not necessary using it so many time, you may not need to cache it for better performance. Or at least, you storage persistence should be `disk and memory` to avoid out of memory. Secondly, `groupByKey` is very expensive here. It's probably not the root cause why the job is not finished, but `groupByKey` will shuffle all the data to the reducer. In your case, you can do filter first which will be executed in parallel in mapper side, and then do `groupByKey`. You can specify higher num of task when you do `groupByKey`. I'll recommend you to find a way to write your logic using `reduceByKey` or `combineByKey` to yield much better performance since those two operations can reduce or combine the data in mapper side which will lead to much less shuffle traffic. Finally, you may want to break down which part of your code causes the issue to make debugging easier. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, Dec 11, 2014 at 4:48 AM, Muhammad Ahsan muhammad.ah...@gmail.com wrote: -- Code -- scala import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._ scala import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.SchemaRDD scala import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveContext scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext} scala val hiveContext: HiveContext = new HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2de76244 scala val numDays = 2 numDays: Int = 2 scala case class Click( /* about 20 fields of type STRING */ ) defined class Click scala val inputRDD = new Array[SchemaRDD](numDays) inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null) scala for (i - 1 to numDays) { | if (i 10) { | inputRDD(i - 1) = hiveContext.parquetFile(hdfs:// + i) | } else { | inputRDD(i - 1) = hiveContext.parquetFile(hdfs:// + i) | } | | } SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala var unionRDD = inputRDD(1) unionRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[1] at RDD at SchemaRDD.scala:104 scala for (i - 1 to inputRDD.length - 1) { | unionRDD = unionRDD.unionAll(inputRDD(i)) | } scala val inputRDD = unionRDD inputRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[2] at RDD at SchemaRDD.scala:104 scala scala inputRDD.registerTempTable(urlInfo) scala val clickstreamRDD = hiveContext.sql(select * from urlInfo + | where guid regexp '^[0-9a-f-]{36}$' + | AND ((callerid 3 AND callerid 1) OR callerid 10 + | OR (callerid=3 AND browsertype = 'IE')) + | AND countrycode regexp '^[A-Z]{2}$') clickstreamRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:104 scala scala clickstreamRDD.registerTempTable(clickstream) scala clickstreamRDD.cache() res4: clickstreamRDD.type = SchemaRDD[3] at RDD at SchemaRDD.scala:104 scala val guidClickRDD = clickstreamRDD.map(row = (row(7).asInstanceOf[String], { | val value = Click(row(0).asInstanceOf[String], | row(1).asInstanceOf[String], row(2).asInstanceOf[String], | row(3).asInstanceOf[String], row(4).asInstanceOf[String], | row(5).asInstanceOf[String], row(6).asInstanceOf[String], | row(7).asInstanceOf[String], row(8).asInstanceOf[String], | row(9).asInstanceOf[String], row(10).asInstanceOf[String], | row(11).asInstanceOf[String], row(12).asInstanceOf[String], | row(13).asInstanceOf[String], row(14).asInstanceOf[String], | row(15).asInstanceOf[String], row(16).asInstanceOf[String], | row(17).asInstanceOf[String], row(18).asInstanceOf[String],
Re: Including data nucleus tools
Just out of my curiosity. Do you manually apply this patch and see if this can actually resolve the issue? It seems that it was merged at some point, but reverted due to that it causes some stability issue. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sat, Dec 13, 2014 at 7:11 AM, spark.dubovsky.ja...@seznam.cz wrote: So to answer my own question. It is a bug and there is unmerged PR for that already. https://issues.apache.org/jira/browse/SPARK-2624 https://github.com/apache/spark/pull/3238 Jakub -- Původní zpráva -- Od: spark.dubovsky.ja...@seznam.cz Komu: spark.dubovsky.ja...@seznam.cz Datum: 12. 12. 2014 15:26:35 Předmět: Re: Including data nucleus tools Hi, I had time to try it again. I submited my app by the same command with these additional options: --jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-core-3.2.10.jar,lib/datanucleus-rdbms-3.2.9.jar Now an app successfully creates hive context. So my question remains: Is classpath entries from sparkUI the same classpath as mentioned in submit script message? Spark assembly has been built with Hive, including Datanucleus jars on classpath If so then why the script fails to really include datanucleus jars on classpath? I found no bug about this on jira. Or is there a way how particular yarn/os settings on our cluster overrides this? Thanks in advance Jakub -- Původní zpráva -- Od: spark.dubovsky.ja...@seznam.cz Komu: Michael Armbrust mich...@databricks.com Datum: 7. 12. 2014 3:02:33 Předmět: Re: Including data nucleus tools Next try. I copied whole dist directory created by make-distribution script to cluster not just assembly jar. Then I used ./bin/spark-submit --num-executors 200 --master yarn-cluster --class org.apache.spark.mllib.CreateGuidDomainDictionary ../spark/root-0.1.jar ${args} ...to run app again. Startup scripts printed this message: Spark assembly has been built with Hive, including Datanucleus jars on classpath ...so I thought I am finally there. But job started and failed on the same ClassNotFound exception as before. Is classpath from script message just classpath of driver? Or is it the same classpath which is affected by --jars option? I was trying to find out from scripts but I was not able to find where --jars option is processed. thanks -- Původní zpráva -- Od: Michael Armbrust mich...@databricks.com Komu: spark.dubovsky.ja...@seznam.cz Datum: 6. 12. 2014 20:39:13 Předmět: Re: Including data nucleus tools On Sat, Dec 6, 2014 at 5:53 AM, spark.dubovsky.ja...@seznam.cz wrote: Bonus question: Should the class org.datanucleus.api.jdo.JDOPersistenceManagerFactory be part of assembly? Because it is not in jar now. No these jars cannot be put into the assembly because they have extra metadata files that live in the same location (so if you put them all in an assembly they overrwrite each other). This metadata is used in discovery. Instead they must be manually put on the classpath in their original form (usually using --jars). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLIB model export: PMML vs MLLIB serialization
I am going to try to export decision tree next, so far I focused on linear models and k-means. Regards, Vincenzo sourabh wrote Thanks Vincenzo. Are you trying out all the models implemented in mllib? Actually I don't see decision tree there. Sorry if I missed it. When are you planning to merge this to spark branch? Thanks Sourabh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark is crashing in this case. why?
Adding group back. FYI Geneis - this was on a m3.xlarge with all default settings in Spark. I used Spark version 1.3.0. The 2nd case did work for me: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very large size (9766 KB). The maximum recommended task size is 100 KB. [1, 2, 3, 4, 5, 6, 7, 8, 9] On Mon, Dec 15, 2014 at 1:33 PM, Sameer Farooqui same...@databricks.com wrote: Hi Genesis, The 2nd case did work for me: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/15 16:33:01 WARN TaskSetManager: Stage 1 contains a task of very large size (9766 KB). The maximum recommended task size is 100 KB. [1, 2, 3, 4, 5, 6, 7, 8, 9] On Sun, Dec 14, 2014 at 2:13 PM, Genesis Fatum genesis.fa...@gmail.com wrote: Hi Sameer, I have tried multiple configurations. For example, executor and driver memory at 2G. Also played with the JRE memory size parameters (-Xms) and get the same error. Does it work for you? I think it is a setup issue on my side, although I have tried a couple laptops. Thanks On Sun, Dec 14, 2014 at 1:11 PM, Sameer Farooqui same...@databricks.com wrote: How much executor-memory are you setting for the JVM? What about the Driver JVM memory? Also check the Windows Event Log for Out of memory errors for one of the 2 above JVMs. On Dec 14, 2014 6:04 AM, genesis fatum genesis.fa...@gmail.com wrote: Hi, My environment is: standalone spark 1.1.1 on windows 8.1 pro. The following case works fine: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(10): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() [1, 2, 3, 4, 5, 6, 7, 8, 9] The following case does not work. The only difference is the size of the array. Note the loop range: 100K vs. 1M. a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(Unknown Source) at java.io.BufferedOutputStream.write(Unknown Source) at java.io.DataOutputStream.write(Unknown Source) at java.io.FilterOutputStream.write(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:341) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:339) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD D.scala:339) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal a:183) What I have tried: 1. Replaced JRE 32bit with JRE64 2. Multiple configurations when I start pyspark: --driver-memory, --executor-memory 3. Tried to set the SparkConf with different settings 4. Tried also with spark 1.1.0 Being new to Spark, I am sure that it is something simple that I am missing and would appreciate any thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Stop streaming context gracefully when SIGTERM is passed
Hi all, We are using Spark Streaming ETL a large volume of time series datasets. In our current design, each dataset we ETL will have a corresponding Spark Streaming context + process running on our cluster. Each of these processes will be passed configuration options specifying the data source to process as well as various tuning parameters such as the number of Receiver objects to use, batch interval size, number of partitions, etc. Since the volume of data we're ingesting for each dataset will fluctuate over time, we'd like to be able to regularly send a SIGTERM to the Spark Streaming process handling the ETL, have that process gracefully complete processing any in-flight data, and restart the process with updated configuration options. The most obvious solution seems to be to call the stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided by StreamingContext in a shutdown hook, but this approach doesn't seem to be working for me. Here's a rough idea of what my code looks like: val ssc = new StreamingContext(conf, Seconds(15)) ... // Add shutdown hook to exit gracefully upon termination. Runtime.getRuntime().addShutdownHook(new Thread() extends Logging { override def run() = { logInfo(Exiting gracefully...) ssc.stop(true, true) } }) ... ssc.start() ssc.awaitTermination() Whenever I try to kill the process, I don't see the Exiting gracefully… log message I've added. I tried grokking through the Spark source code to see if some other shutdown hook might be squashing the hook I've added by causing the process to exit before this hook is invoked, but I haven't found anything that would cause concern yet. Does anybody have any advice or insight on this? I'm a bit of a novice when it comes to the JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities here. Thanks, Adam
Re: Intermittent test failures
Ok, maybe these test versions will help me then. I’ll check it out. On 15.12.2014, at 22:33, Michael Armbrust mich...@databricks.com wrote: Using a single SparkContext should not cause this problem. In the SQL tests we use TestSQLContext and TestHive which are global singletons for all of our unit testing. On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier mps@gmail.com wrote: Possible, yes, although I’m trying everything I can to prevent it, i.e. fork in Test := true and isolated. Can you confirm that reusing a single SparkContext for multiple tests poses a problem as well? Other than that, just switching from SQLContext to HiveContext also provoked the error. On 15.12.2014, at 20:22, Michael Armbrust mich...@databricks.com wrote: Is it possible that you are starting more than one SparkContext in a single JVM with out stopping previous ones? I'd try testing with Spark 1.2, which will throw an exception in this case. On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier mps@gmail.com wrote: Hi, I’m seeing strange, random errors when running unit tests for my Spark jobs. In this particular case I’m using Spark SQL to read and write Parquet files, and one error that I keep running into is this one: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) I can only prevent this from happening by using isolated Specs tests thats always create a new SparkContext that is not shared between tests (but there can also be only a single SparkContext per test), and also by using standard SQLContext instead of HiveContext. It does not seem to have anything to do with the actual files that I also create during the test run with SQLContext.saveAsParquetFile. Cheers - Marius PS The full trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232) org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927) org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155) sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) ~[spark-core_2.10-1.1.1.jar:1.1.1]
NumberFormatException
Hello, everyone I know 'NumberFormatException' is due to the reason that String can not be parsed properly, but I really can not find any mistakes for my code. I hope someone may kindly help me. My hdfs file is as follows: 8,22 3,11 40,10 49,47 48,29 24,28 50,30 33,56 4,20 30,38 ... So each line contains an integer + , + an integer + \n My code is as follows: object StreamMonitor { def main(args: Array[String]): Unit = { val myFunc = (str: String) = { val strArray = str.trim().split(,) (strArray(0).toInt, strArray(1).toInt) } val conf = new SparkConf().setAppName(StreamMonitor); val ssc = new StreamingContext(conf, Seconds(30)); val datastream = ssc.textFileStream(/user/yu/streaminput); val newstream = datastream.map(myFunc) newstream.saveAsTextFiles(output/, ); ssc.start() ssc.awaitTermination() } } The exception info is: 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: 8 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) java.lang.Integer.parseInt(Integer.java:492) java.lang.Integer.parseInt(Integer.java:527) scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) scala.collection.immutable.StringOps.toInt(StringOps.scala:31) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) So based on the above info, 8 is the first number in the file and I think it should be parsed to integer without any problems. I know it may be a very stupid question and the answer may be very easy. But I really can not find the reason. I am thankful to anyone who helps! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why KMeans with mllib is so slow ?
I've tried some additional experiments with kmeans and I finally got it worked as I expected. In fact, the number of partition is critical. I had a data set of 24x784 with 12 partitions. In this case the kmeans algorithm took a very long time (about hours to converge). When I change the partition into 32, the same kmeans ( runs = 10, k = 10, iterations = 300, init = kmeans|| ) converges in 4 min with 8 cores As a comparison, the same problem solve with python scikit-learn takes 21 min on a single core. So spark wins :) As conclusion, setting the number of partition correctly is essential. Is there a rule of thumb for that ? On Mon, Dec 15, 2014 at 8:55 PM, Xiangrui Meng men...@gmail.com wrote: Please check the number of partitions after sc.textFile. Use sc.textFile('...', 8) to have at least 8 partitions. -Xiangrui On Tue, Dec 9, 2014 at 4:58 AM, DB Tsai dbt...@dbtsai.com wrote: You just need to use the latest master code without any configuration to get performance improvement from my PR. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Mon, Dec 8, 2014 at 7:53 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: After some investigation, I learned that I can't compare kmeans in mllib with another kmeans implementation directly. The kmeans|| initialization step takes more time than the algorithm implemented in julia for example. There is also the ability to run multiple runs of kmeans algorithm in mllib even by default the number of runs is 1. DB Tsai can you please tell me the configuration you took for the improvement you mention in your pull request. I'd like to run the same benchmark on mnist8m on my computer. Cheers; On Fri, Dec 5, 2014 at 10:34 PM, DB Tsai dbt...@dbtsai.com wrote: Also, are you using the latest master in this experiment? A PR merged into the master couple days ago will spend up the k-means three times. See https://github.com/apache/spark/commit/7fc49ed91168999d24ae7b4cc46fbb4ec87febc1 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 5, 2014 at 9:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: The code is really simple : object TestKMeans { def main(args: Array[String]) { val conf = new SparkConf() .setAppName(Test KMeans) .setMaster(local[8]) .set(spark.executor.memory, 8g) val sc = new SparkContext(conf) val numClusters = 500; val numIterations = 2; val data = sc.textFile(sample.csv).map(x = Vectors.dense(x.split(',').map(_.toDouble))) data.cache() val clusters = KMeans.train(data, numClusters, numIterations) println(clusters.clusterCenters.size) val wssse = clusters.computeCost(data) println(serror : $wssse) } } For the testing purpose, I was generating a sample random data with julia and store it in a csv file delimited by comma. The dimensions is 248000 x 384. In the target application, I will have more than 248k data to cluster. On Fri, Dec 5, 2014 at 6:03 PM, Davies Liu dav...@databricks.com wrote: Could you post you script to reproduce the results (also how to generate the dataset)? That will help us to investigate it. On Fri, Dec 5, 2014 at 8:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hmm, here I use spark on local mode on my laptop with 8 cores. The data is on my local filesystem. Event thought, there an overhead due to the distributed computation, I found the difference between the runtime of the two implementations really, really huge. Is there a benchmark on how well the algorithm implemented in mllib performs ? On Fri, Dec 5, 2014 at 4:56 PM, Sean Owen so...@cloudera.com wrote: Spark has much more overhead, since it's set up to distribute the computation. Julia isn't distributed, and so has no such overhead in a completely in-core implementation. You generally use Spark when you have a problem large enough to warrant distributing, or, your data already lives in a distributed store like HDFS. But it's also possible you're not configuring the implementations the same way, yes. There's not enough info here really to say. On Fri, Dec 5, 2014 at 9:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to a run clustering with kmeans algorithm. The size of my data set is about 240k vectors of dimension 384. Solving the problem with the kmeans available in julia (kmean++) http://clusteringjl.readthedocs.org/en/latest/kmeans.html take about 8
Re: Stop streaming context gracefully when SIGTERM is passed
Hi Adam, I have following scala actor based code to do graceful shutdown: class TimerActor (val timeout : Long, val who : Actor) extends Actor { def act { reactWithin (timeout) { case TIMEOUT = who ! SHUTDOWN } } } class SSCReactor (val ssc : StreamingContext) extends Actor with Logging { def act { react { case SHUTDOWN = logger.info (sShutting down gracefully ...) ssc.stop (true, true) } } } I see following message: 14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ... 14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully 14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be consumed for job generation 14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be consumed for job generation -Soumitra. On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam bu...@amazon.com wrote: Hi all, We are using Spark Streaming ETL a large volume of time series datasets. In our current design, each dataset we ETL will have a corresponding Spark Streaming context + process running on our cluster. Each of these processes will be passed configuration options specifying the data source to process as well as various tuning parameters such as the number of Receiver objects to use, batch interval size, number of partitions, etc. Since the volume of data we're ingesting for each dataset will fluctuate over time, we'd like to be able to regularly send a SIGTERM to the Spark Streaming process handling the ETL, have that process gracefully complete processing any in-flight data, and restart the process with updated configuration options. The most obvious solution seems to be to call the stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided by StreamingContext in a shutdown hook, but this approach doesn't seem to be working for me. Here's a rough idea of what my code looks like: val ssc = new StreamingContext(conf, Seconds(15)) ... // Add shutdown hook to exit gracefully upon termination. Runtime.getRuntime().addShutdownHook(new Thread() extends Logging { override def run() = { logInfo(Exiting gracefully...) ssc.stop(true, true) } }) ... ssc.start() ssc.awaitTermination() Whenever I try to kill the process, I don't see the Exiting gracefully… log message I've added. I tried grokking through the Spark source code to see if some other shutdown hook might be squashing the hook I've added by causing the process to exit before this hook is invoked, but I haven't found anything that would cause concern yet. Does anybody have any advice or insight on this? I'm a bit of a novice when it comes to the JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities here. Thanks, Adam
Re: NumberFormatException
That certainly looks surprising. Are you sure there are no unprintable characters in the file? On Mon, Dec 15, 2014 at 9:49 PM, yu yuz1...@iastate.edu wrote: The exception info is: 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: 8 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark job have sideeffects (write files to FileSystem)
Yes, this is what I also found in Spark documentation, that foreach can have side effects. Nevertheless I have this weird error, that sometimes files are just empty. using is simply a wrapper that takes our code, makes try-catch-finally and flush close all resources. I honestly have no clue what can possibly be wrong. No errors in logs. On Thu, Dec 11, 2014 at 2:29 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Yes, this is perfectly legal. This is what RDD.foreach() is for! You may be encountering an IO exception while writing, and maybe using() suppresses it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd expect there is less that can go wrong with that simple call. On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc paul.sz...@gmail.com wrote: Imagine simple Spark job, that will store each line of the RDD to a separate file val lines = sc.parallelize(1 to 100).map(n = sthis is line $n) lines.foreach(line = writeToFile(line)) def writeToFile(line: String) = { def filePath = file://... val file = new File(new URI(path).getPath) // using function simply closes the output stream using(new FileOutputStream(file)) { output = output.write(value) } } Now, example above works 99,9% of a time. Files are generated for each line, each file contains that particular line. However, when dealing with large number of data, we encounter situations where some of the files are empty! Files are generated, but there is no content inside of them (0 bytes). Now the question is: can Spark job have side effects. Is it even legal to write such code? If no, then what other choice do we have when we want to save data from our RDD? If yes, then do you guys see what could be the reason of this job acting in this strange manner 0.1% of the time? disclaimer: we are fully aware of .saveAsTextFile method in the API, however the example above is a simplification of our code - normally we produce PDF files. Best regards, Paweł Szulc
Re: Can spark job have sideeffects (write files to FileSystem)
Thinking about that any task could be launched concurrently in different nodes, so in order to make sure the generated files are valid, you need some atomic operation (such as rename) to do it. For example, you could generate a random name for output file, writing the data into it, rename it to the target name finally. This is what happened in saveAsTextFile(). On Mon, Dec 15, 2014 at 4:37 PM, Paweł Szulc paul.sz...@gmail.com wrote: Yes, this is what I also found in Spark documentation, that foreach can have side effects. Nevertheless I have this weird error, that sometimes files are just empty. using is simply a wrapper that takes our code, makes try-catch-finally and flush close all resources. I honestly have no clue what can possibly be wrong. No errors in logs. On Thu, Dec 11, 2014 at 2:29 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Yes, this is perfectly legal. This is what RDD.foreach() is for! You may be encountering an IO exception while writing, and maybe using() suppresses it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd expect there is less that can go wrong with that simple call. On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc paul.sz...@gmail.com wrote: Imagine simple Spark job, that will store each line of the RDD to a separate file val lines = sc.parallelize(1 to 100).map(n = sthis is line $n) lines.foreach(line = writeToFile(line)) def writeToFile(line: String) = { def filePath = file://... val file = new File(new URI(path).getPath) // using function simply closes the output stream using(new FileOutputStream(file)) { output = output.write(value) } } Now, example above works 99,9% of a time. Files are generated for each line, each file contains that particular line. However, when dealing with large number of data, we encounter situations where some of the files are empty! Files are generated, but there is no content inside of them (0 bytes). Now the question is: can Spark job have side effects. Is it even legal to write such code? If no, then what other choice do we have when we want to save data from our RDD? If yes, then do you guys see what could be the reason of this job acting in this strange manner 0.1% of the time? disclaimer: we are fully aware of .saveAsTextFile method in the API, however the example above is a simplification of our code - normally we produce PDF files. Best regards, Paweł Szulc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what is the best way to implement mini batches?
Hi Imran, you are right. Sequentially process does not make sense to use spark. I think Sequentially process works if batch for each iteration is large enough(this batch could be processed in parallel). My point is that we shall not run mini-batches in parallel, but it still possible to use large batch for parallel inside each batch(It seems to be the way that SGD implemented in MLLib does?). -- Earthson Lu On December 16, 2014 at 04:02:22, Imran Rashid (im...@therashids.com) wrote: I'm a little confused by some of the responses. It seems like there are two different issues being discussed here: 1. How to turn a sequential algorithm into something that works on spark. Eg deal with the fact that data is split into partitions which are processed in parallel (though within a partition, data is processed sequentially). I'm guessing folks are particularly interested in online machine learning algos, which often have a point update and a mini batch update. 2. How to convert a one-point-at-a-time view of the data and convert it into a mini batches view of the data. (2) is pretty straightforward, eg with iterator.grouped (batchSize), or manually put data into your own buffer etc. This works for creating mini batches *within* one partition in the context of spark. But problem (1) is completely separate, and there is no general solution. It really depends the specifics of what you're trying to do. Some of the suggestions on this thread seem like they are basically just falling back to sequential data processing ... but reay inefficient sequential processing. Eg. It doesn't make sense to do a full scan of your data with spark, and ignore all the records but the few that are in the next mini batch. It's completely reasonable to just sequentially process all the data if that works for you. But then it doesn't make sense to use spark, you're not gaining anything from it. Hope this helps, apologies if I just misunderstood the other suggested solutions. On Dec 14, 2014 8:35 PM, Earthson earthson...@gmail.com wrote: I think it could be done like: 1. using mapPartition to randomly drop some partition 2. drop some elements randomly(for selected partition) 3. calculate gradient step for selected elements I don't think fixed step is needed, but fixed step could be done: 1. zipWithIndex 2. create ShuffleRDD based on the index(eg. using index/10 as key) 3. using mapPartition to calculate each bach I also have a question: Can mini batches run in parallel? I think parallel all batches just like a full batch GD in some case. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NotSerializableException in Spark Streaming
This still seems to be broken. In 1.1.1, it errors immediately on this line (from the above repro script): liveTweets.map(t = noop(t)).print() The stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC.init(console:34) at $iwC.init(console:36) at init(console:38) at .init(console:42) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at
Re: NumberFormatException
Hi Yu, Try this : val data = csv.map( line = line.split(,).map(elem = elem.trim)) //lines in rows data.map( rec = (rec(0).toInt, rec(1).toInt)) to convert into integer. On 16 December 2014 at 10:49, yu [via Apache Spark User List] ml-node+s1001560n20694...@n3.nabble.com wrote: Hello, everyone I know 'NumberFormatException' is due to the reason that String can not be parsed properly, but I really can not find any mistakes for my code. I hope someone may kindly help me. My hdfs file is as follows: 8,22 3,11 40,10 49,47 48,29 24,28 50,30 33,56 4,20 30,38 ... So each line contains an integer + , + an integer + \n My code is as follows: object StreamMonitor { def main(args: Array[String]): Unit = { val myFunc = (str: String) = { val strArray = str.trim().split(,) (strArray(0).toInt, strArray(1).toInt) } val conf = new SparkConf().setAppName(StreamMonitor); val ssc = new StreamingContext(conf, Seconds(30)); val datastream = ssc.textFileStream(/user/yu/streaminput); val newstream = datastream.map(myFunc) newstream.saveAsTextFiles(output/, ); ssc.start() ssc.awaitTermination() } } The exception info is: 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: 8 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) java.lang.Integer.parseInt(Integer.java:492) java.lang.Integer.parseInt(Integer.java:527) scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) scala.collection.immutable.StringOps.toInt(StringOps.scala:31) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) So based on the above info, 8 is the first number in the file and I think it should be parsed to integer without any problems. I know it may be a very stupid question and the answer may be very easy. But I really can not find the reason. I am thankful to anyone who helps! -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694p20696.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: JSON Input files
Thank you Peter for the clarification. Regards, Rajesh On Tue, Dec 16, 2014 at 12:42 AM, Michael Armbrust mich...@databricks.com wrote: Underneath the covers, jsonFile uses TextInputFormat, which will split files correctly based on new lines. Thus, there is no fixed maximum size for a json object (other than the fact that it must fit into memory on the executors). On Mon, Dec 15, 2014 at 7:22 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Peter, Thank you for the clarification. Now we need to store each JSON object into one line. Is there any limitation of length of JSON object? So, JSON object will not go to the next line. What will happen if JSON object is a big/huge one? Will it store in a single line in HDFS? What will happen, if JSON object contains BLOB/CLOB value? Is this entire JSON object stores in single line of HDFS? What will happen, if JSON object exceeding the HDFS block size. For example, single JSON object split into two different worker nodes. In this case, How Spark will read this JSON object? Could you please clarify above questions Regards, Rajesh On Mon, Dec 15, 2014 at 6:52 PM, Peter Vandenabeele pe...@vandenabeele.com wrote: On Sat, Dec 13, 2014 at 5:43 PM, Helena Edelson helena.edel...@datastax.com wrote: One solution can be found here: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets As far as I understand, the people.json file is not really a proper json file, but a file documented as: ... JSON files where each line of the files is a JSON object.. This means that is a file with multiple lines, but each line needs to have a fully self-contained JSON object (initially confusing, this will not parse a standard multi-line JSON file). We are working to clarify this in https://github.com/apache/spark/pull/3517 HTH, Peter - Helena @helenaedelson On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, I have a large JSON file in Hadoop. Could you please let me know 1. How to read the JSON file 2. How to parse the JSON file Please share any example program based on Scala Regards, Rajesh -- Peter Vandenabeele http://www.allthingsdata.io http://www.linkedin.com/in/petervandenabeele https://twitter.com/peter_v gsm: +32-478-27.40.69 e-mail: pe...@vandenabeele.com skype: peter_v_be
Re: Executor memory
Hi Pala, Spark executors only reserve spark.storage.memoryFraction (default 0.6) of their spark.executor.memory for caching RDDs. The spark UI displays this fraction. spark.executor.memory controls the executor heap size. spark.yarn.executor.memoryOverhead controls the extra that's tacked on for the container memory. -Sandy On Dec 15, 2014, at 7:53 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, Running Spark 1.0.1 on Yarn 2.5 When i specify --executor-memory 4g, the spark UI shows each executor as having only 2.3 GB, and similarly for 8g, only 4.6 GB. I am guessing that the executor memory corresponds to the container memory, and that the task JVM gets only a percentage of the container total memory. Is there a yarn or spark parameter to tune this so that my task JVM actually gets 6GB out of the 8GB for example? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fetch Failed caused job failed.
While I was running spark MR job, there was FetchFailed(BlockManagerId(47, xx.com, 40975, 0), shuffleId=2, mapId=5, reduceId=286), then there were many retries, and the job failed finally. And the log showed the following error, does anybody meet this error ? or is it a known issue in Spark ? Thanks. 4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /home/spark/spark-1.1/python/pyspark/worker.py, line 75, in main command = pickleSer._read_with_length(infile) File /home/spark/spark-1.1/python/pyspark/serializers.py, line 146, in _read_with_length length = read_int(stream) File /home/spark/spark-1.1/python/pyspark/serializers.py, line 464, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed: BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:335) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/12/16 10:43:43 ERROR PythonRDD: This may have been caused by a prior exception: org.apache.spark.shuffle.FetchFailedException: Fetch failed: BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
Re: ALS failure with size Integer.MAX_VALUE
Ok. We'll try using it in a test cluster running 1.2. On 16-Dec-2014 1:36 am, Xiangrui Meng men...@gmail.com wrote: Unfortunately, it will depends on the Sorter API in 1.2. -Xiangrui On Mon, Dec 15, 2014 at 11:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui, The block size limit was encountered even with reduced number of item blocks as you had expected. I'm wondering if I could try the new implementation as a standalone library against a 1.1 deployment. Does it have dependencies on any core API's in the current master? Thanks, Bharath On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Fwd: Run Spark job on Playframework + Spark Master/Worker in one Mac
Hi Aniket, Thanks for your reply. I followed your advice to modified my code. Here is latest one. https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35 As a result, It works correctly! Thank you very much. But, AssociationError Message appears line 397 in Playframework logs as follows. https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d Is there any problem? 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: Try the workaround (addClassPathJars(sparkContext, this.getClass.getClassLoader) discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E Thanks, Aniket On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Can I set max execution time for any task in a job?
Is that possible, if not, how would one do it from PySpark ? This probably does not make sense in most cases, but am writing a script where my job involves downloading and pushing data into cassandra.. sometimes a task hangs forever, and I dont really mind killing it.. The job is not actually computing some result that requires all tasks to succeed. Thanks, Mohamed.
RE: is there a way to interact with Spark clusters remotely?
Thanks all for your information! What Pietro mentioned seems to be the appropriate solution.. I also find a slideshttp://www.slideshare.net/EvanChan2/spark-summit-2014-spark-job-server-talk talking about it. Several quick questions: 1. Is it already available in Spark main branch? (seems not but I am not sure if it is in plan) 2. It seems that the current job sever can only submit Java jars (or Scala I guess?) - is there any plan to support Python in the future? Thanks and any information would be appreciated! Xiaoyong From: Pietro Gentile [mailto:pietro.gentil...@gmail.com] Sent: Monday, December 15, 2014 10:33 PM To: Xiaoyong Zhu Subject: R: is there a way to interact with Spark clusters remotely? Hi, try this https://github.com/spark-jobserver/spark-jobserver . Best Regards, Pietro Gentile Da: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Inviato: lunedì 15 dicembre 2014 15:17 A: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: is there a way to interact with Spark clusters remotely? Hi experts I am wondering if there is a way to interactive with Spark remotely? i.e. no access to clusters required but submit Python/Scala scripts to cluster and get result based on (REST) APIs. That will facilitate the development process a lot.. Xiaoyong [http://static.avast.com/emails/avast-mail-stamp.png]http://www.avast.com/ Questa e-mail è priva di virus e malware perché è attiva la protezione avast! Antivirushttp://www.avast.com/ .
Re: Run Spark job on Playframework + Spark Master/Worker in one Mac
Seems you are using standalone mode. Can you check spark worker logs or application logs in spark work directory to find any errors? On Tue, Dec 16, 2014, 9:09 AM Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi Aniket, Thanks for your reply. I followed your advice to modified my code. Here is latest one. https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35 As a result, It works correctly! Thank you very much. But, AssociationError Message appears line 397 in Playframework logs as follows. https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d Is there any problem? 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: Try the workaround (addClassPathJars(sparkContext, this.getClass.getClassLoader) discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E Thanks, Aniket On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Re: Can I set max execution time for any task in a job?
There is a spark listener interface https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.scheduler.SparkListener which can be used to trigger events like jobStarted, TaskGotResults etc but i don't think you can set execution time anywhere. If a task is hung, its mostly because of the GC pause (depends on your job), if you can paste the code, then probably we can tell you where the bottleneck is. Thanks Best Regards On Tue, Dec 16, 2014 at 9:59 AM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Is that possible, if not, how would one do it from PySpark ? This probably does not make sense in most cases, but am writing a script where my job involves downloading and pushing data into cassandra.. sometimes a task hangs forever, and I dont really mind killing it.. The job is not actually computing some result that requires all tasks to succeed. Thanks, Mohamed.
Re: Fetch Failed caused job failed.
You could try setting the following while creating the sparkContext .set(spark.rdd.compress,true) .set(spark.storage.memoryFraction,1) .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.frameSize,50) Thanks Best Regards On Tue, Dec 16, 2014 at 8:30 AM, Mars Max m...@baidu.com wrote: While I was running spark MR job, there was FetchFailed(BlockManagerId(47, xx.com, 40975, 0), shuffleId=2, mapId=5, reduceId=286), then there were many retries, and the job failed finally. And the log showed the following error, does anybody meet this error ? or is it a known issue in Spark ? Thanks. 4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /home/spark/spark-1.1/python/pyspark/worker.py, line 75, in main command = pickleSer._read_with_length(infile) File /home/spark/spark-1.1/python/pyspark/serializers.py, line 146, in _read_with_length length = read_int(stream) File /home/spark/spark-1.1/python/pyspark/serializers.py, line 464, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed: BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:335) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/12/16 10:43:43 ERROR PythonRDD: This may have been caused by a prior exception: org.apache.spark.shuffle.FetchFailedException: Fetch failed: BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at
Re: NumberFormatException
There could be some other character like a space or ^M etc. You could try the following and see the actual row. val newstream = datastream.map(row = { try{ val strArray = str.trim().split(,) (strArray(0).toInt, strArray(1).toInt) //Instead try this //*(strArray(0).trim().toInt, strArray(1).trim().toInt)* }catch{ case e: Exception = println(W000t!! Exception!! = + e + \n The line was : + row); (0, 0) } }) Thanks Best Regards On Tue, Dec 16, 2014 at 3:19 AM, yu yuz1...@iastate.edu wrote: Hello, everyone I know 'NumberFormatException' is due to the reason that String can not be parsed properly, but I really can not find any mistakes for my code. I hope someone may kindly help me. My hdfs file is as follows: 8,22 3,11 40,10 49,47 48,29 24,28 50,30 33,56 4,20 30,38 ... So each line contains an integer + , + an integer + \n My code is as follows: object StreamMonitor { def main(args: Array[String]): Unit = { val myFunc = (str: String) = { val strArray = str.trim().split(,) (strArray(0).toInt, strArray(1).toInt) } val conf = new SparkConf().setAppName(StreamMonitor); val ssc = new StreamingContext(conf, Seconds(30)); val datastream = ssc.textFileStream(/user/yu/streaminput); val newstream = datastream.map(myFunc) newstream.saveAsTextFiles(output/, ); ssc.start() ssc.awaitTermination() } } The exception info is: 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: 8 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) java.lang.Integer.parseInt(Integer.java:492) java.lang.Integer.parseInt(Integer.java:527) scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) scala.collection.immutable.StringOps.toInt(StringOps.scala:31) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) So based on the above info, 8 is the first number in the file and I think it should be parsed to integer without any problems. I know it may be a very stupid question and the answer may be very easy. But I really can not find the reason. I am thankful to anyone who helps! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Run Spark job on Playframework + Spark Master/Worker in one Mac
Thanks for response. Yes, I am using standalone mode. I couldn't find any errors. But, WARN messages appear in Spark master logs. Here is Spark master logs. https://gist.github.com/TomoyaIgarashi/72145c11d3769c7d1ddb FYI Here is Spark worker logs. https://gist.github.com/TomoyaIgarashi/0db77e93cacb4a93aa1f Here is Playframework logs. https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d If you have any comments, please let us know. Regards 2014-12-16 15:34 GMT+09:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: Seems you are using standalone mode. Can you check spark worker logs or application logs in spark work directory to find any errors? On Tue, Dec 16, 2014, 9:09 AM Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi Aniket, Thanks for your reply. I followed your advice to modified my code. Here is latest one. https://github.com/TomoyaIgarashi/spark_cluster_sample/commit/ce7613c42d3adbe6ae44e264c11f3829460f3c35 As a result, It works correctly! Thank you very much. But, AssociationError Message appears line 397 in Playframework logs as follows. https://gist.github.com/TomoyaIgarashi/9688bdd5663af95ddd4d Is there any problem? 2014-12-15 18:48 GMT+09:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: Try the workaround (addClassPathJars(sparkContext, this.getClass.getClassLoader) discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAJOb8buD1B6tUtOfG8_Ok7F95C3=r-zqgffoqsqbjdxd427...@mail.gmail.com%3E Thanks, Aniket On Mon Dec 15 2014 at 07:43:24 Tomoya Igarashi tomoya.igarashi.0...@gmail.com wrote: Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Accessing Apache Spark from Java
Hi I have installed a standalone Spark set up in standalone mode in a Linux server and I am trying to access that spark setup from Java in windows. When I try connecting to Spark I see the following exception 14/12/16 12:52:52 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/12/16 12:52:56 INFO AppClient$ClientActor: Connecting to master spark://01hw294954.INDIA:7077... 14/12/16 12:53:07 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/12/16 12:53:16 INFO AppClient$ClientActor: Connecting to master spark://01hw294954.INDIA:7077... 14/12/16 12:53:22 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/12/16 12:53:36 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 14/12/16 12:53:36 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/12/16 12:53:36 INFO TaskSchedulerImpl: Cancelling stage 0 14/12/16 12:53:36 INFO DAGScheduler: Failed to run collect at MySqlConnector.java:579 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run I have attached the Spark Master UI Spark Master at spark://01hw294954.INDIA:7077 URL: spark://01hw294954.INDIA:7077 Workers: 1 Cores: 2 Total, 0 Used Memory: 835.0 MB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Workers Id Address State Cores Memory worker-20141216123503-01hw294954.INDIA-3896201hw294954.INDIA:38962 ALIVE 2 (0 Used) 835.0 MB (0.0 B Used) Running Applications ID NameCores Memory per Node Submitted Time UserState Duration Completed Applications ID NameCores Memory per Node Submitted Time UserState Duration My Spark Slave is Spark Worker at 01hw294954.INDIA:38962 ID: worker-20141216123503-01hw294954.INDIA-38962 Master URL: spark://01hw294954.INDIA:7077 Cores: 2 (0 Used) Memory: 835.0 MB (0.0 B Used) Back to Master Running Executors (0) ExecutorID Cores State Memory Job Details Logs My Java Master Code looks like this SparkConf sparkConf = new SparkConf().setAppName(JdbcRddTest); sparkConf.setMaster(spark://01hw294954.INDIA:7077); When I tried using the same code with the local spark set up as the master it ran. Any help for solving this issue is very much appreciated. Thanks and Regards Jai -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Apache-Spark-from-Java-tp20700.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org