Re: need someone to help clear some questions.
groups.google.com/forum/#!forum/shark-users Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 8:08 PM, qingyang li liqingyang1...@gmail.comwrote: Hi, Yana, do you know if there is mailing list for shark like spark's? 2014-03-06 23:39 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: Hi qingyang, 1. You do not need to install shark on every node. 2. Not really sure..it's just a warning so I'd see if it works despite it 3. You need to provide the actual hdfs path, e.g. hdfs://namenode/user2/vols.csv, see this thread https://groups.google.com/forum/#!topic/tachyon-users/3Da4zcHKBbY Lastly as your questions are more shark than spark related there is a separate shark user group that might be more helpful. Hope this helps On Thu, Mar 6, 2014 at 3:25 AM, qingyang li liqingyang1...@gmail.comwrote: just a addition for #3, i have such configuration in shark-env.sh: export HADOOP_HOME=/usr/lib/hadoop export HADOOP_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/usr/lib/hive/ #export HIVE_CONF_DIR=/etc/hive/conf export MASTER=spark://bigdata001:7077 - 2014-03-06 16:20 GMT+08:00 qingyang li liqingyang1...@gmail.com: hi, spark community, i have setup 3 nodes cluster using spark 0.9 and shark 0.9, My question is : 1. is there any neccessary to install shark on every node since it is a client to use spark service ? 2. when i run shark-withinfo, i got such warning: WARN shark.SharkEnv: Hive Hadoop shims detected local mode, but Shark is not running locally. WARN shark.SharkEnv: Setting mapred.job.tracker to 'Spark_1394093746930' (was 'local') what does this log want to tell us ? is it a problem to run shark? 3. i want to load data from hdfs , so i run LOAD DATA INPATH '/user/root/input/test.txt' into table b; , but i got this error:No files matching path file:/user/root/input/test.txt , but this file exists on hdfs. thanks.
Re: how to get size of rdd in memery
addtion : 1. i have run LOAD DATA INPATH '/user/root/input/test.txt' into table b; in shark. i think this will create rdd in memery, right? 2. when i run free -g , the result show somethings has been stored into memery. the file is almost 4g. [root@bigdata001 spark-0.9.0-incubating-bin-hadoop2]# free -g total used free sharedbuffers cached Mem:15 6 8 0 0 4 -/+ buffers/cache: 2 13 Swap:7 0 7 2014-03-07 16:51 GMT+08:00 qingyang li liqingyang1...@gmail.com: in that page, it is empty , it does not show anything. Here is the picture. 2014-03-07 16:14 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com: http://Master URL:4040/storage/ Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 12:09 AM, qingyang li liqingyang1...@gmail.comwrote: dear community, can anyone tell me : how to get size of rdd in memery ? thanks.
Re: Kryo serialization does not compress
Hi Patrick, Thanks for your reply. I am guessing even an array type will be registered automatically. Is this correct? Thanks, Pradeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2400.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
java.lang.ClassNotFoundException in spark 0.9.0, shark 0.9.0 (pre-release) and hadoop 2.2.0
Hi, We are currently trying to migrate to hadoop 2.2.0 and hence we have installed spark 0.9.0 and the pre-release version of shark 0.9.0. When we execute the script ( script.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n2401/script.txt ) we get the following error. /org.apache.spark.SparkException: Job aborted: Task 1.0:3 failed 4 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) / Has anyone seen this error? If so, could you please help me get it corrected? Thanks, Pradeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-in-spark-0-9-0-shark-0-9-0-pre-release-and-hadoop-2-2-0-tp2401.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: major Spark performance problem
Hi, There is also an option to run spark applications on top of mesos in fine grained mode, then it is possible for fair scheduling (applications will run in parallel and mesos is responsible for scheduling all tasks) so in a sense all applications will progress in parallel, obviously it total in may not be faster however the benefit is the fair scheduling (small jobs will not be stuck by the big ones). Best regards Lukasz Jastrzebski -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/major-Spark-performance-problem-tp2364p2403.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Can anyone offer any insight at all?
What is wrong with this code? A condensed set of this code works in the spark-shell. It does not work when deployed via a jar. def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = { val spd = new PipelineDate(start) val epd = new PipelineDate(end) // filter for event1 events and return RDDs that are maps of user_ids and 0 val f = sc.textFile(spd.toJsonHdfsFileName) val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).cache val ev1c = ev1rdd.count.toDouble // do the same as above for event2 events, only substitute 0s with 1s val ev2rdds = for { dt - PipelineDate.getPeriod(spd+1,epd) val f1 = sc.textFile(dt.toJsonHdfsFileName) } yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).distinct) // cache all event1 and event2 RDDs ev2rdds.foreach(_.cache) val cts = for { ev2 - ev2rdds } yield ev2.count val retent = for { ev2rdd - ev2rdds val ret = ev1rdd.union(ev2rdd).groupByKey() } yield ret.filter(e = e._2.length 1 e._2.filter(_==0).length0) val rcts = retent.map(_.count) println(--) println(s${rcts}) println(s${cts}) for { c - rcts } yield(ev1c/c.toDouble) //Map(result:_*) } This is what this code prints: List(0, 0) List(785912, 825254) List(Infinity, Infinity) My question is: it does not appear that the union().groupBy().filter() segment is working (the List(0,0) output). The app is not failing, it finishes just fine. Any ideas? Ognen
Re: Can anyone offer any insight at all?
Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;) Ognen On 3/7/14, 12:41 PM, Ognen Duzlevski wrote: What is wrong with this code? A condensed set of this code works in the spark-shell. It does not work when deployed via a jar. def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = { val spd = new PipelineDate(start) val epd = new PipelineDate(end) // filter for event1 events and return RDDs that are maps of user_ids and 0 val f = sc.textFile(spd.toJsonHdfsFileName) val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).cache val ev1c = ev1rdd.count.toDouble // do the same as above for event2 events, only substitute 0s with 1s val ev2rdds = for { dt - PipelineDate.getPeriod(spd+1,epd) val f1 = sc.textFile(dt.toJsonHdfsFileName) } yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).distinct) // cache all event1 and event2 RDDs ev2rdds.foreach(_.cache) val cts = for { ev2 - ev2rdds } yield ev2.count val retent = for { ev2rdd - ev2rdds val ret = ev1rdd.union(ev2rdd).groupByKey() } yield ret.filter(e = e._2.length 1 e._2.filter(_==0).length0) val rcts = retent.map(_.count) println(--) println(s${rcts}) println(s${cts}) for { c - rcts } yield(ev1c/c.toDouble) //Map(result:_*) } This is what this code prints: List(0, 0) List(785912, 825254) List(Infinity, Infinity) My question is: it does not appear that the union().groupBy().filter() segment is working (the List(0,0) output). The app is not failing, it finishes just fine. Any ideas? Ognen -- Some people, when confronted with a problem, think I know, I'll use regular expressions. Now they have two problems. -- Jamie Zawinski
Re: Can anyone offer any insight at all?
the issue was with print? printing on worker? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski og...@plainvanillagames.com wrote: Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;) Ognen On 3/7/14, 12:41 PM, Ognen Duzlevski wrote: What is wrong with this code? A condensed set of this code works in the spark-shell. It does not work when deployed via a jar. def calcSimpleRetention(start:String,end:String,event1: String,event2:String):List[Double] = { val spd = new PipelineDate(start) val epd = new PipelineDate(end) // filter for event1 events and return RDDs that are maps of user_ids and 0 val f = sc.textFile(spd.toJsonHdfsFileName) val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:) (1).replace(\,),1)).cache val ev1c = ev1rdd.count.toDouble // do the same as above for event2 events, only substitute 0s with 1s val ev2rdds = for { dt - PipelineDate.getPeriod(spd+1,epd) val f1 = sc.textFile(dt.toJsonHdfsFileName) } yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:) (1).replace(\,),1)).distinct) // cache all event1 and event2 RDDs ev2rdds.foreach(_.cache) val cts = for { ev2 - ev2rdds } yield ev2.count val retent = for { ev2rdd - ev2rdds val ret = ev1rdd.union(ev2rdd).groupByKey() } yield ret.filter(e = e._2.length 1 e._2.filter(_==0).length0) val rcts = retent.map(_.count) println(--) println(s${rcts}) println(s${cts}) for { c - rcts } yield(ev1c/c.toDouble) //Map(result:_*) } This is what this code prints: List(0, 0) List(785912, 825254) List(Infinity, Infinity) My question is: it does not appear that the union().groupBy().filter() segment is working (the List(0,0) output). The app is not failing, it finishes just fine. Any ideas? Ognen -- Some people, when confronted with a problem, think I know, I'll use regular expressions. Now they have two problems. -- Jamie Zawinski
Re: Setting properties in core-site.xml for Spark and Hadoop to access
Set them as environment variable at boot configure both stacks to call on that.. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 9:32 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On spinning up a Spark cluster in EC2, I'd like to set a few configs that will allow me to access files in S3 without having to specify my AWS access and secret keys over and over, as described herehttp://stackoverflow.com/a/3033403/877069 . The properties are fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey. Is there a way to set these properties programmatically so that Spark (via the shell) and Hadoop (via distcp) are both aware of and use the values? I don't think SparkConf does what I need because I want Hadoop to also be aware of my AWS keys. When I set those properties using conf.set() in pyspark, distcp didn't appear to be aware of them. Nick -- View this message in context: Setting properties in core-site.xml for Spark and Hadoop to accesshttp://apache-spark-user-list.1001560.n3.nabble.com/Setting-properties-in-core-site-xml-for-Spark-and-Hadoop-to-access-tp2402.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Can anyone offer any insight at all?
No. It was a logical error. val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).cache should have mapped to ,0, not ,1 I have had the most awful time figuring out these looped things. It seems like it is next to impossible to run a .filter() operation in a for loop, it seems to work if you yield .filter() Still don't understand why that is... Ognen On 3/7/14, 1:05 PM, Mayur Rustagi wrote: the issue was with print? printing on worker? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote: Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;) Ognen On 3/7/14, 12:41 PM, Ognen Duzlevski wrote: What is wrong with this code? A condensed set of this code works in the spark-shell. It does not work when deployed via a jar. def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = { val spd = new PipelineDate(start) val epd = new PipelineDate(end) // filter for event1 events and return RDDs that are maps of user_ids and 0 val f = sc.textFile(spd.toJsonHdfsFileName) val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).cache val ev1c = ev1rdd.count.toDouble // do the same as above for event2 events, only substitute 0s with 1s val ev2rdds = for { dt - PipelineDate.getPeriod(spd+1,epd) val f1 = sc.textFile(dt.toJsonHdfsFileName) } yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)).distinct) // cache all event1 and event2 RDDs ev2rdds.foreach(_.cache) val cts = for { ev2 - ev2rdds } yield ev2.count val retent = for { ev2rdd - ev2rdds val ret = ev1rdd.union(ev2rdd).groupByKey() } yield ret.filter(e = e._2.length 1 e._2.filter(_==0).length0) val rcts = retent.map(_.count) println(--) println(s${rcts}) println(s${cts}) for { c - rcts } yield(ev1c/c.toDouble) //Map(result:_*) } This is what this code prints: List(0, 0) List(785912, 825254) List(Infinity, Infinity) My question is: it does not appear that the union().groupBy().filter() segment is working (the List(0,0) output). The app is not failing, it finishes just fine. Any ideas? Ognen -- Some people, when confronted with a problem, think I know, I'll use regular expressions. Now they have two problems. -- Jamie Zawinski -- Some people, when confronted with a problem, think I know, I'll use regular expressions. Now they have two problems. -- Jamie Zawinski
Re: Running actions in loops
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1: String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:) (1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:) (1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$ apache$spark$scheduler$DAGScheduler$$abortStage$1. apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$ apache$spark$scheduler$DAGScheduler$$abortStage$1. apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent( DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$ $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec( ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java:1339) at
Re: Streaming JSON string from REST Api in Spring
Easiest is to use a queue, Kafka for example. So push your json request string into kafka, connect spark streaming to kafka pull data from it execute it. Spark streaming will split up the jobs pipeline the data. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 6:24 PM, sonyjv sonyjvech...@yahoo.com wrote: Thanks Mayur for your response. I think I need to clarify the first part of my query. The JSON based REST API will be called by external interfaces. These requests needs to be processed in a streaming mode in Spark. I am not clear about the following points 1. How can JSON request string (50 per sec) be continuously streamed to Spark. 2. The processing of the request in Spark will not last long. But would require to be split into multiple steps to render fast initial response. So for coordinating the Spark jobs do I have to use Kafka or any other queues. Or can I directly stream from one job to another. Regards, Sony -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-JSON-string-from-REST-Api-in-Spring-tp2358p2383.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Help connecting to the cluster
Hi Spark users, could someone help me out. My company has a fully functioning spark cluster with shark running on top of it (as part of the same cluster, on the same LAN) . I'm interested in running raw spark code against it but am running against the following issue -- it seems like the machine hosting the driver program needs to be reachable by the worker nodes (in my case the workers cannot route to the machine hosting the driver). Below is a snippet from my worker log: 14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler 14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down. Does this sound right -- it's not clear to me why a worker would try to establish a connection to the driver -- the driver already connected successfully as I see the program listed in the logwhy is this connection not sufficient? If you use Amazon EC2, can you run the driver from your personal machine or do have to install an IDE on one of Amazon machines in order to debug code? I am not too excited about the EC2 option as our data is proprietary...but if that's the shortest path to success at least it would get me started on some toy examples. At the moment I'm not sure what my options are, other than running a VM cluster or EC2 Any help/insight would be greatly appreciated.
[BLOG] Spark on Cassandra w/ Calliope
FWIW - I posted some notes to help people get started quickly with Spark on C*. http://brianoneill.blogspot.com/2014/03/spark-on-cassandra-w-calliope.html (tnx again to Rohit and team for all of their help) -brian -- Brian ONeill CTO, Health Market Science (http://healthmarketscience.com) mobile:215.588.6024 blog: http://brianoneill.blogspot.com/ twitter: @boneill42
Re: [BLOG] Spark on Cassandra w/ Calliope
Nice, thanks :) Ognen On 3/7/14, 2:48 PM, Brian O'Neill wrote: FWIW - I posted some notes to help people get started quickly with Spark on C*. http://brianoneill.blogspot.com/2014/03/spark-on-cassandra-w-calliope.html (tnx again to Rohit and team for all of their help) -brian -- Brian ONeill CTO, Health Market Science (http://healthmarketscience.com) mobile:215.588.6024 blog: http://brianoneill.blogspot.com/ twitter: @boneill42 -- Some people, when confronted with a problem, think I know, I'll use regular expressions. Now they have two problems. -- Jamie Zawinski
Re: Running actions in loops
Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :) Ognen On 3/7/14, 1:29 PM, Mayur Rustagi wrote: Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote: It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io http://java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io http://java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at
Re: Setting properties in core-site.xml for Spark and Hadoop to access
Mayur, So looking at the section on environment variables herehttp://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables, are you saying to set these options via SPARK_JAVA_OPTS -D? On a related note, in looking around I just discovered this command line tool for modifying XML files called XMLStarlethttp://xmlstar.sourceforge.net/overview.php. Perhaps I should instead set these S3 keys directly in the right core-site.xml using XMLStarlet. Devs/Everyone, On a related note, I discovered that Spark (on EC2) reads Hadoop options from /root/ephemeral-hdfs/conf/core-site.xml. This is surprising given the variety of copies of core-site.xml on the EC2 cluster that gets built by spark-ec2. A quick search yields the following relevant results (snipped): find / -name core-site.xml 2 /dev/null /root/mapreduce/conf/core-site.xml /root/persistent-hdfs/conf/core-site.xml /root/ephemeral-hdfs/conf/core-site.xml /root/spark/conf/core-site.xml It looks like both pyspark and ephemeral-hdfs/bin/hadoop read configs from the ephemeral-hdfs core-site.xml file. The latter is expected; the former is not. Is this intended behavior? I expected pyspark to read configs from the spark core-site.xml file. The moment I remove my AWS credentials from the ephemeral-hdfs config file, pyspark cannot open files in S3 without me providing the credentials in-line. I also guessed that the config file under /root/mapreduce might be a kind of base config file that both Spark and Hadoop would read from first, and then override with configs from the other files. The path to the config suggests that, but it doesn't appear to be the case. Adding my AWS keys to that file seemed to affect neither Spark nor ephemeral-hdfs/bin/hadoop. Nick On Fri, Mar 7, 2014 at 2:07 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Set them as environment variable at boot configure both stacks to call on that.. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 9:32 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On spinning up a Spark cluster in EC2, I'd like to set a few configs that will allow me to access files in S3 without having to specify my AWS access and secret keys over and over, as described herehttp://stackoverflow.com/a/3033403/877069 . The properties are fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey. Is there a way to set these properties programmatically so that Spark (via the shell) and Hadoop (via distcp) are both aware of and use the values? I don't think SparkConf does what I need because I want Hadoop to also be aware of my AWS keys. When I set those properties using conf.set() in pyspark, distcp didn't appear to be aware of them. Nick -- View this message in context: Setting properties in core-site.xml for Spark and Hadoop to accesshttp://apache-spark-user-list.1001560.n3.nabble.com/Setting-properties-in-core-site-xml-for-Spark-and-Hadoop-to-access-tp2402.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?
Hi, I'm trying to run a kafka-stream and get a strange exception. The streaming is created by following code: val lines = KafkaUtils.createStream[String, VtrRecord, StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2) 'VtrRecord' is generated by protobuf in the same package, 'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord' as following: import com.aries.hawkeyes.VtrRecordProtos.VtrRecord class VtrRecordDeserializer(props: VerifiableProperties = null) extends kafka.serializer.Decoder[VtrRecord] { override def fromBytes(bytes : Array[Byte]) : VtrRecord = { VtrRecord.parseFrom(bytes) } } When the assembly jar(build by maven-shade-plugin) is submitted to the Spark cluster, I get the following ClassNotFoundException exception: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: com.aries.hawkeyes.VtrRecordProtos$VtrRecord at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:186) at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768) ... 34 more I have checked the assembly jar on the workers with `jar -tf`, 'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there. Also, to test whether the executor can load this class, I have tried 'System.out.println(Class.forName(com.aries.hawkeyes.VtrRecordProtos$VtrRecord))' in my application and 'Thread.currentThread.getContextClassLoader.loadClass(com.aries.hawkeyes.VtrRecordProtos$VtrRecord)' in
Re: Running actions in loops
So the whole function closure you want to apply on your RDD needs to be serializable so that it can be serialized sent to workers to operate on RDD. So objects of jodatime cannot be serialized sent hence jodatime is out of work. 2 bad answers 1. initialize jodatime for each row complete work destroy them, that way they are only intialized when job is running need not be sent across. 2. Write your own parser hope jodatime guys get their act together. Regards Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski og...@nengoiksvelzud.comwrote: Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :) Ognen On 3/7/14, 1:29 PM, Mayur Rustagi wrote: Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at
Re: Help connecting to the cluster
The driver contains the DAG scheduler which manages stages of jobs needs to talk back forth with workers. So you can run Driver on any machine that can reach master drivers(even your laptop). But Driver will need to be reachable to all machines. I think 0.9.0 added an ability for the driver to embedded in the master, I am not sure if its general or restricted to Spark Streaming. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 12:29 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Hi Spark users, could someone help me out. My company has a fully functioning spark cluster with shark running on top of it (as part of the same cluster, on the same LAN) . I'm interested in running raw spark code against it but am running against the following issue -- it seems like the machine hosting the driver program needs to be reachable by the worker nodes (in my case the workers cannot route to the machine hosting the driver). Below is a snippet from my worker log: 14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler 14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down. Does this sound right -- it's not clear to me why a worker would try to establish a connection to the driver -- the driver already connected successfully as I see the program listed in the logwhy is this connection not sufficient? If you use Amazon EC2, can you run the driver from your personal machine or do have to install an IDE on one of Amazon machines in order to debug code? I am not too excited about the EC2 option as our data is proprietary...but if that's the shortest path to success at least it would get me started on some toy examples. At the moment I'm not sure what my options are, other than running a VM cluster or EC2 Any help/insight would be greatly appreciated.
Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?
Hi, I'm trying to run a kafka-stream and get a strange exception. The streaming is created by following code: val lines = KafkaUtils.createStream[String, VtrRecord, StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2) 'VtrRecord' is generated by protobuf in the same package, 'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord' as following: import com.aries.hawkeyes.VtrRecordProtos.VtrRecord class VtrRecordDeserializer(props: VerifiableProperties = null) extends kafka.serializer.Decoder[VtrRecord] { override def fromBytes(bytes : Array[Byte]) : VtrRecord = { VtrRecord.parseFrom(bytes) } } When the assembly jar(build by maven-shade-plugin) is submitted to the Spark cluster, I get the following ClassNotFoundException exception: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: com.aries.hawkeyes.VtrRecordProtos$VtrRecord at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:186) at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768) ... 34 more I have checked the assembly jar on the workers with `jar -tf`, 'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there. Also, to test whether the executor can load this class, I have tried 'System.out.println(Class.forName(com.aries.hawkeyes.VtrRecordProtos$VtrRecord))' in my application and 'Thread.currentThread.getContextClassLoader.loadClass(com.aries.hawkeyes.VtrRecordProtos$VtrRecord)' in
Re: Explain About Logs NetworkWordcount.scala
I am not sure how to debug this without any more information about the source. Can you monitor on the receiver side that data is being accepted by the receiver but not reported? TD On Wed, Mar 5, 2014 at 7:23 AM, eduardocalfaia e.costaalf...@unibs.itwrote: Hi TD, I have seen in the web UI the stage number that result has been zero and in the field GC Times there is nothing. http://apache-spark-user-list.1001560.n3.nabble.com/file/n2306/CaptureStage.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Explain-About-Logs-NetworkWordcount-scala-tp1835p2306.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running actions in loops
There is #3 which is use mapPartitions and init one jodatime obj per partition, which is less overhead for large objects— Sent from Mailbox for iPhone On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: So the whole function closure you want to apply on your RDD needs to be serializable so that it can be serialized sent to workers to operate on RDD. So objects of jodatime cannot be serialized sent hence jodatime is out of work. 2 bad answers 1. initialize jodatime for each row complete work destroy them, that way they are only intialized when job is running need not be sent across. 2. Write your own parser hope jodatime guys get their act together. Regards Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski og...@nengoiksvelzud.comwrote: Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :) Ognen On 3/7/14, 1:29 PM, Mayur Rustagi wrote: Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at