Re: need someone to help clear some questions.

2014-03-07 Thread Mayur Rustagi
groups.google.com/forum/#!forum/shark-users

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 6, 2014 at 8:08 PM, qingyang li liqingyang1...@gmail.comwrote:

 Hi, Yana, do you know if there is mailing list for shark  like spark's?


 2014-03-06 23:39 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Hi qingyang,

 1. You do not need to install shark on every node.
 2. Not really sure..it's just a warning so I'd see if it works despite it
 3. You need to provide the actual hdfs path, e.g.
 hdfs://namenode/user2/vols.csv, see this thread
 https://groups.google.com/forum/#!topic/tachyon-users/3Da4zcHKBbY

 Lastly as your questions are more shark than spark related there is a
 separate shark user group that might be more helpful.
 Hope this helps


 On Thu, Mar 6, 2014 at 3:25 AM, qingyang li liqingyang1...@gmail.comwrote:

 just a addition for #3,  i have such configuration in shark-env.sh:
 
 export HADOOP_HOME=/usr/lib/hadoop
 export HADOOP_CONF_DIR=/etc/hadoop/conf
 export HIVE_HOME=/usr/lib/hive/
 #export HIVE_CONF_DIR=/etc/hive/conf
 export MASTER=spark://bigdata001:7077
 -


 2014-03-06 16:20 GMT+08:00 qingyang li liqingyang1...@gmail.com:

 hi, spark community,  i have setup 3 nodes cluster using spark 0.9 and
 shark 0.9,  My question is :
 1. is there any neccessary to install shark on every node since it is a
 client to use spark service ?
 2. when i run shark-withinfo, i got such warning:
  WARN shark.SharkEnv: Hive Hadoop shims detected local mode, but Shark
 is not running locally.
 WARN shark.SharkEnv: Setting mapred.job.tracker to
 'Spark_1394093746930' (was 'local')
 what does this log want to tell us ?
 is it a problem to run shark?
 3. i want to load data from hdfs , so i run LOAD DATA INPATH
 '/user/root/input/test.txt' into table b;  , but i got this error:No files
 matching path file:/user/root/input/test.txt , but this file exists on
 hdfs.

 thanks.







Re: how to get size of rdd in memery

2014-03-07 Thread qingyang li
addtion :
1. i have run LOAD DATA INPATH '/user/root/input/test.txt' into table b; in
shark.  i think this will create rdd in memery, right?
2. when i run free -g , the result show somethings has been stored into
memery.  the file is almost 4g.

[root@bigdata001 spark-0.9.0-incubating-bin-hadoop2]# free -g
 total   used   free sharedbuffers cached
Mem:15  6  8  0  0  4
-/+ buffers/cache:  2 13
Swap:7  0  7



2014-03-07 16:51 GMT+08:00 qingyang li liqingyang1...@gmail.com:

 in that page, it is empty , it does not show anything.
 Here is the picture.




 2014-03-07 16:14 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com:

 http://Master URL:4040/storage/

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Mar 7, 2014 at 12:09 AM, qingyang li liqingyang1...@gmail.comwrote:

 dear community, can anyone tell me : how to get size of rdd in memery ?
 thanks.






Re: Kryo serialization does not compress

2014-03-07 Thread pradeeps8
Hi Patrick,

Thanks for your reply.

I am guessing even an array type will be registered automatically. Is this
correct?

Thanks,
Pradeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


java.lang.ClassNotFoundException in spark 0.9.0, shark 0.9.0 (pre-release) and hadoop 2.2.0

2014-03-07 Thread pradeeps8
Hi,

We are currently trying to migrate to hadoop 2.2.0 and hence we have
installed spark 0.9.0 and the pre-release version of shark 0.9.0.
When we execute the script ( script.txt
http://apache-spark-user-list.1001560.n3.nabble.com/file/n2401/script.txt 
) we get the following error.
/org.apache.spark.SparkException: Job aborted: Task 1.0:3 failed 4 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1) 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 
at scala.Option.foreach(Option.scala:236) 
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
/

Has anyone seen this error?
If so, could you please help me get it corrected?

Thanks,
Pradeep




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-in-spark-0-9-0-shark-0-9-0-pre-release-and-hadoop-2-2-0-tp2401.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: major Spark performance problem

2014-03-07 Thread elyast
Hi,

There is also an option to run spark applications on top of mesos in fine
grained mode, then it is possible for fair scheduling (applications will run
in parallel and mesos is responsible for scheduling all tasks) so in a sense
all applications will progress in parallel, obviously it total in may not be
faster however the benefit is the fair scheduling (small jobs will not be
stuck by the big ones).

Best regards
Lukasz Jastrzebski



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/major-Spark-performance-problem-tp2364p2403.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def 
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] 
= {

val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of 
user_ids and 0

val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd = 
f.filter(_.split(,)(0).split(:)(1).replace(\,) == 
event1).map(line = 
(line.split(,)(2).split(:)(1).replace(\,),1)).cache

val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute 0s with 1s
val ev2rdds = for {
   dt - PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) 
== event2).map(line = 
(line.split(,)(2).split(:)(1).replace(\,),1)).distinct)


// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 - ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd - ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e = e._2.length  1  e._2.filter(_==0).length0)

val rcts = retent.map(_.count)
println(--)
println(s${rcts})
println(s${cts})

for {
  c - rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the 
union().groupBy().filter() segment is working (the List(0,0) 
output). The app is not failing, it finishes just fine.


Any ideas?
Ognen


Re: Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski
Strike that. Figured it out. Don't you just hate it when you fire off an 
email and you figure it out as it is being sent? ;)

Ognen

On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def 
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] 
= {

val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps of 
user_ids and 0

val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd = 
f.filter(_.split(,)(0).split(:)(1).replace(\,) == 
event1).map(line = 
(line.split(,)(2).split(:)(1).replace(\,),1)).cache

val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute 0s with 1s
val ev2rdds = for {
   dt - PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) 
== event2).map(line = 
(line.split(,)(2).split(:)(1).replace(\,),1)).distinct)


// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 - ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd - ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e = e._2.length  1  
e._2.filter(_==0).length0)


val rcts = retent.map(_.count)
println(--) 


println(s${rcts})
println(s${cts})

for {
  c - rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the 
union().groupBy().filter() segment is working (the List(0,0) 
output). The app is not failing, it finishes just fine.


Any ideas?
Ognen


--
Some people, when confronted with a problem, think I know, I'll use regular 
expressions. Now they have two problems.
-- Jamie Zawinski



Re: Can anyone offer any insight at all?

2014-03-07 Thread Mayur Rustagi
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski 
og...@plainvanillagames.com wrote:

 Strike that. Figured it out. Don't you just hate it when you fire off an
 email and you figure it out as it is being sent? ;)
 Ognen


 On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

 What is wrong with this code?

 A condensed set of this code works in the spark-shell.

 It does not work when deployed via a jar.

 def calcSimpleRetention(start:String,end:String,event1:
 String,event2:String):List[Double] = {
 val spd = new PipelineDate(start)
 val epd = new PipelineDate(end)
 // filter for event1 events and return RDDs that are maps of user_ids
 and 0
 val f = sc.textFile(spd.toJsonHdfsFileName)
 val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event1).map(line = (line.split(,)(2).split(:)
 (1).replace(\,),1)).cache
 val ev1c = ev1rdd.count.toDouble

 // do the same as above for event2 events, only substitute 0s with 1s
 val ev2rdds = for {
dt - PipelineDate.getPeriod(spd+1,epd)
val f1 = sc.textFile(dt.toJsonHdfsFileName)
 } yield (f1.filter(_.split(,)(0).split(:)(1).replace(\,) ==
 event2).map(line = (line.split(,)(2).split(:)
 (1).replace(\,),1)).distinct)

 // cache all event1 and event2 RDDs
 ev2rdds.foreach(_.cache)
 val cts = for {
   ev2 - ev2rdds
 } yield ev2.count

 val retent = for {
   ev2rdd - ev2rdds
   val ret = ev1rdd.union(ev2rdd).groupByKey()
 } yield ret.filter(e = e._2.length  1  e._2.filter(_==0).length0)

 val rcts = retent.map(_.count)
 println(--)

 println(s${rcts})
 println(s${cts})

 for {
   c - rcts
 } yield(ev1c/c.toDouble)
 //Map(result:_*)
   }

 This is what this code prints:
 List(0, 0)
 List(785912, 825254)
 List(Infinity, Infinity)

 My question is: it does not appear that the
 union().groupBy().filter() segment is working (the List(0,0) output).
 The app is not failing, it finishes just fine.

 Any ideas?
 Ognen


 --
 Some people, when confronted with a problem, think I know, I'll use
 regular expressions. Now they have two problems.
 -- Jamie Zawinski




Re: Setting properties in core-site.xml for Spark and Hadoop to access

2014-03-07 Thread Mayur Rustagi
Set them as environment variable at boot  configure both stacks to call on
that..

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Mar 7, 2014 at 9:32 AM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 On spinning up a Spark cluster in EC2, I'd like to set a few configs that
 will allow me to access files in S3 without having to specify my AWS access
 and secret keys over and over, as described 
 herehttp://stackoverflow.com/a/3033403/877069
 .

 The properties are fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey.

 Is there a way to set these properties programmatically so that Spark (via
 the shell) and Hadoop (via distcp) are both aware of and use the values?

 I don't think SparkConf does what I need because I want Hadoop to also be
 aware of my AWS keys. When I set those properties using conf.set() in
 pyspark, distcp didn't appear to be aware of them.

 Nick


 --
 View this message in context: Setting properties in core-site.xml for
 Spark and Hadoop to 
 accesshttp://apache-spark-user-list.1001560.n3.nabble.com/Setting-properties-in-core-site-xml-for-Spark-and-Hadoop-to-access-tp2402.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: Can anyone offer any insight at all?

2014-03-07 Thread Ognen Duzlevski

No.

It was a logical error.

val ev1rdd = f.filter(_.split(,)(0).split(:)(1).replace(\,) == 
event1).map(line = 
(line.split(,)(2).split(:)(1).replace(\,),1)).cache should have 
mapped to ,0, not ,1


I have had the most awful time figuring out these looped things. It 
seems like it is next to impossible to run a .filter() operation in a 
for loop, it seems to work if you yield .filter()


Still don't understand why that is...

Ognen

On 3/7/14, 1:05 PM, Mayur Rustagi wrote:

the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski 
og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote:


Strike that. Figured it out. Don't you just hate it when you fire
off an email and you figure it out as it is being sent? ;)
Ognen


On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def

calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
= {
val spd = new PipelineDate(start)
val epd = new PipelineDate(end)
// filter for event1 events and return RDDs that are maps
of user_ids and 0
val f = sc.textFile(spd.toJsonHdfsFileName)
val ev1rdd =
f.filter(_.split(,)(0).split(:)(1).replace(\,) ==
event1).map(line =
(line.split(,)(2).split(:)(1).replace(\,),1)).cache
val ev1c = ev1rdd.count.toDouble

// do the same as above for event2 events, only substitute
0s with 1s
val ev2rdds = for {
   dt - PipelineDate.getPeriod(spd+1,epd)
   val f1 = sc.textFile(dt.toJsonHdfsFileName)
} yield
(f1.filter(_.split(,)(0).split(:)(1).replace(\,) ==
event2).map(line =
(line.split(,)(2).split(:)(1).replace(\,),1)).distinct)

// cache all event1 and event2 RDDs
ev2rdds.foreach(_.cache)
val cts = for {
  ev2 - ev2rdds
} yield ev2.count

val retent = for {
  ev2rdd - ev2rdds
  val ret = ev1rdd.union(ev2rdd).groupByKey()
} yield ret.filter(e = e._2.length  1 
e._2.filter(_==0).length0)

val rcts = retent.map(_.count)

println(--)

println(s${rcts})
println(s${cts})

for {
  c - rcts
} yield(ev1c/c.toDouble)
//Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the
union().groupBy().filter() segment is working (the
List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen


-- 
Some people, when confronted with a problem, think I know, I'll

use regular expressions. Now they have two problems.
-- Jamie Zawinski




--
Some people, when confronted with a problem, think I know, I'll use regular 
expressions. Now they have two problems.
-- Jamie Zawinski



Re: Running actions in loops

2014-03-07 Thread Mayur Rustagi
Mostly the job you are executing is not serializable, this typically
happens when you have a library that is not serializable.. are you using
any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com
 wrote:

 It looks like the problem is in the filter task - is there anything
 special about filter()?

 I have removed the filter line from the loops just to see if things will
 work and they do.

 Anyone has any ideas?

 Thanks!
 Ognen


 On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

 Hello,

 What is the general approach people take when trying to do analysis
 across multiple large files where the data to be extracted from a
 successive file depends on the data extracted from a previous file or set
 of files?

 For example:
 I have the following: a group of HDFS files each 20+GB in size. I need to
 extract event1 on day 1 from first file and extract event2 from all
 remaining files in a period of successive dates, then do a calculation on
 the two events.
 I then need to move on to day2, extract event1 (with certain properties),
 take all following days, extract event2 and run a calculation against
 previous day for all days in period. So on and so on.

 I have verified that the following (very naive approach doesn't work):

 def calcSimpleRetention(start:String,end:String,event1:
 String,event2:String):Map[String,List[Double]] = {
 val epd = new PipelineDate(end)
 val result = for {
   dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd)
   val f1 = sc.textFile(dt1.toJsonHdfsFileName)
   val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event1).map(line = (line.split(,)(2).split(:)
 (1).replace(\,),0)).cache
   val c = e1.count.toDouble

   val intres = for {
 dt2 - PipelineDate.getPeriod(dt1+1,epd)
 val f2 = sc.textFile(dt2.toJsonHdfsFileName)
 val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event2).map(line = (line.split(,)(2).split(:)
 (1).replace(\,),1))
 val e1e2 = e1.union(e2)
 val r = e1e2.groupByKey().filter(e = e._2.length  1 
 e._2.filter(_==0).length0).count.toDouble
   } yield (c/r) // get the retention rate
 } yield (dt1.toString-intres)
 Map(result:_*)
   }

 I am getting the following errors:
 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
 CountActor.scala:33
 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
 CountActor.scala:33) with 140 output partitions (allowLocal=false)
 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
 CountActor.scala:33)
 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 map at CountActor.scala:32), which has no missing parents
 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
 CountActor.scala:33
 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
 serializable: java.io.NotSerializableException:
 com.github.ognenpv.pipeline.CountActor
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
 apache$spark$scheduler$DAGScheduler$$abortStage$1.
 apply(DAGScheduler.scala:1028)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
 apache$spark$scheduler$DAGScheduler$$abortStage$1.
 apply(DAGScheduler.scala:1026)
 at scala.collection.mutable.ResizableArray$class.foreach(
 ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
 scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
 scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
 scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
 at org.apache.spark.scheduler.DAGScheduler.processEvent(
 DAGScheduler.scala:569)
 at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$
 $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
 AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(
 ForkJoinTask.java:260)
 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
 runTask(ForkJoinPool.java:1339)
 at 

Re: Streaming JSON string from REST Api in Spring

2014-03-07 Thread Mayur Rustagi
Easiest is to use a queue, Kafka for example. So push your json request
string into kafka,
connect spark streaming to kafka  pull data from it  execute it.
Spark streaming will split up the jobs  pipeline the data.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 6, 2014 at 6:24 PM, sonyjv sonyjvech...@yahoo.com wrote:

 Thanks Mayur for your response.

 I think I need to clarify the first part of my query. The JSON based REST
 API will be called by external interfaces. These requests needs to be
 processed in a streaming mode in Spark. I am not clear about the following
 points

 1. How can JSON request string (50 per sec) be continuously streamed to
 Spark.
 2. The processing of the request in Spark will not last long. But would
 require to be split into multiple steps to render fast initial response. So
 for coordinating the Spark jobs do I have to use Kafka or any other queues.
 Or can I directly stream from one job to another.

 Regards,
 Sony



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-JSON-string-from-REST-Api-in-Spring-tp2358p2383.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Help connecting to the cluster

2014-03-07 Thread Yana Kadiyska
Hi Spark users,

could someone help me out.

My company has a fully functioning spark cluster with shark running on
top of it (as part of the same cluster, on the same LAN) . I'm
interested in running raw spark code against it but am running against
the following issue -- it seems like the machine hosting the driver
program needs to be reachable by the worker nodes (in my case the
workers cannot route to the machine hosting the driver). Below is a
snippet from my worker log:

14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting
to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler
14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver
terminated or disconnected! Shutting down.

Does this sound right -- it's not clear to me why a worker would try
to establish a connection to the driver -- the driver already
connected successfully as I see the program listed in the logwhy
is this connection not sufficient?

If you use Amazon EC2, can you run the driver from your personal
machine or do have to install an IDE on one of Amazon machines in
order to debug code? I am not too excited about the EC2 option as our
data is proprietary...but if that's the shortest path to success at
least it would get me started on some toy examples. At the moment I'm
not sure what my options are, other than running a VM cluster or EC2

Any help/insight would be greatly appreciated.


[BLOG] Spark on Cassandra w/ Calliope

2014-03-07 Thread Brian O'Neill
FWIW - I posted some notes to help people get started quickly with Spark on
C*.
http://brianoneill.blogspot.com/2014/03/spark-on-cassandra-w-calliope.html

(tnx again to Rohit and team for all of their help)

-brian

-- 
Brian ONeill
CTO, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://brianoneill.blogspot.com/
twitter: @boneill42


Re: [BLOG] Spark on Cassandra w/ Calliope

2014-03-07 Thread Ognen Duzlevski

Nice, thanks :)
Ognen

On 3/7/14, 2:48 PM, Brian O'Neill wrote:


FWIW - I posted some notes to help people get started quickly with 
Spark on C*.

http://brianoneill.blogspot.com/2014/03/spark-on-cassandra-w-calliope.html

(tnx again to Rohit and team for all of their help)

-brian

--
Brian ONeill
CTO, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://brianoneill.blogspot.com/
twitter: @boneill42


--
Some people, when confronted with a problem, think I know, I'll use regular 
expressions. Now they have two problems.
-- Jamie Zawinski



Re: Running actions in loops

2014-03-07 Thread Ognen Duzlevski
Mayur, have not thought of that. Yes, I use jodatime. What is the scope 
that this serialization issue applies to? Only the method making a call 
into / using such a library? The whole class the method using such a 
library belongs to? Sorry if it is a dumb question :)


Ognen

On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
Mostly the job you are executing is not serializable, this typically 
happens when you have a library that is not serializable.. are you 
using any library like jodatime etc ?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski 
og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote:


It looks like the problem is in the filter task - is there
anything special about filter()?

I have removed the filter line from the loops just to see if
things will work and they do.

Anyone has any ideas?

Thanks!
Ognen


On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

Hello,

What is the general approach people take when trying to do
analysis across multiple large files where the data to be
extracted from a successive file depends on the data extracted
from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in
size. I need to extract event1 on day 1 from first file and
extract event2 from all remaining files in a period of
successive dates, then do a calculation on the two events.
I then need to move on to day2, extract event1 (with certain
properties), take all following days, extract event2 and run a
calculation against previous day for all days in period. So on
and so on.

I have verified that the following (very naive approach
doesn't work):

def

calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
= {
val epd = new PipelineDate(end)
val result = for {
  dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd)
  val f1 = sc.textFile(dt1.toJsonHdfsFileName)
  val e1 =
f1.filter(_.split(,)(0).split(:)(1).replace(\,) ==
event1).map(line =
(line.split(,)(2).split(:)(1).replace(\,),0)).cache
  val c = e1.count.toDouble

  val intres = for {
dt2 - PipelineDate.getPeriod(dt1+1,epd)
val f2 = sc.textFile(dt2.toJsonHdfsFileName)
val e2 =
f2.filter(_.split(,)(0).split(:)(1).replace(\,) ==
event2).map(line =
(line.split(,)(2).split(:)(1).replace(\,),1))
val e1e2 = e1.union(e2)
val r = e1e2.groupByKey().filter(e = e._2.length  1
 e._2.filter(_==0).length0).count.toDouble
  } yield (c/r) // get the retention rate
} yield (dt1.toString-intres)
Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at
CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0
(count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage:
List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0
(MappedRDD[3] at map at CountActor.scala:32), which has no
missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task
not serializable: java.io
http://java.io.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io
http://java.io.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
at

org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at

org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at

scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org

http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at 

Re: Setting properties in core-site.xml for Spark and Hadoop to access

2014-03-07 Thread Nicholas Chammas
Mayur,

So looking at the section on environment variables
herehttp://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables,
are you saying to set these options via SPARK_JAVA_OPTS -D? On a related
note, in looking around I just discovered this command line tool for
modifying XML files called
XMLStarlethttp://xmlstar.sourceforge.net/overview.php.
Perhaps I should instead set these S3 keys directly in the right
core-site.xml using XMLStarlet.

Devs/Everyone,

On a related note, I discovered that Spark (on EC2) reads Hadoop options
from /root/ephemeral-hdfs/conf/core-site.xml.

This is surprising given the variety of copies of core-site.xml on the EC2
cluster that gets built by spark-ec2. A quick search yields the following
relevant results (snipped):

find / -name core-site.xml 2 /dev/null

/root/mapreduce/conf/core-site.xml
/root/persistent-hdfs/conf/core-site.xml
/root/ephemeral-hdfs/conf/core-site.xml
/root/spark/conf/core-site.xml


It looks like both pyspark and ephemeral-hdfs/bin/hadoop read configs from
the ephemeral-hdfs core-site.xml file. The latter is expected; the former
is not. Is this intended behavior?

I expected pyspark to read configs from the spark core-site.xml file. The
moment I remove my AWS credentials from the ephemeral-hdfs config file,
pyspark cannot open files in S3 without me providing the credentials
in-line.

I also guessed that the config file under /root/mapreduce might be a kind
of base config file that both Spark and Hadoop would read from first, and
then override with configs from the other files. The path to the config
suggests that, but it doesn't appear to be the case. Adding my AWS keys to
that file seemed to affect neither Spark nor ephemeral-hdfs/bin/hadoop.

Nick


On Fri, Mar 7, 2014 at 2:07 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Set them as environment variable at boot  configure both stacks to call
 on that..

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Mar 7, 2014 at 9:32 AM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 On spinning up a Spark cluster in EC2, I'd like to set a few configs that
 will allow me to access files in S3 without having to specify my AWS access
 and secret keys over and over, as described 
 herehttp://stackoverflow.com/a/3033403/877069
 .

 The properties are fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey.

 Is there a way to set these properties programmatically so that Spark
 (via the shell) and Hadoop (via distcp) are both aware of and use the
 values?

 I don't think SparkConf does what I need because I want Hadoop to also be
 aware of my AWS keys. When I set those properties using conf.set() in
 pyspark, distcp didn't appear to be aware of them.

 Nick


 --
 View this message in context: Setting properties in core-site.xml for
 Spark and Hadoop to 
 accesshttp://apache-spark-user-list.1001560.n3.nabble.com/Setting-properties-in-core-site-xml-for-Spark-and-Hadoop-to-access-tp2402.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.





Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?

2014-03-07 Thread Aries Kong
Hi,

I'm trying to run a kafka-stream and get a strange exception. The

streaming is created by following code:


val lines = KafkaUtils.createStream[String, VtrRecord,

StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap,

topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2)


'VtrRecord' is generated by protobuf in the same package,

'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord'

as following:


import com.aries.hawkeyes.VtrRecordProtos.VtrRecord

class VtrRecordDeserializer(props: VerifiableProperties = null)

extends kafka.serializer.Decoder[VtrRecord] {

override def fromBytes(bytes : Array[Byte]) : VtrRecord = {

VtrRecord.parseFrom(bytes)

}

}


When the assembly jar(build by maven-shade-plugin)  is submitted to

the Spark cluster, I get the following ClassNotFoundException

exception:


java.lang.RuntimeException: Unable to find proto buffer class

at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:616)

at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)

at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)

at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)

at org.apache.spark.scheduler.Task.run(Task.scala:53)

at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)

at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

Caused by: java.lang.ClassNotFoundException:

com.aries.hawkeyes.VtrRecordProtos$VtrRecord

at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

at java.lang.ClassLoader.loadClass(ClassLoader.java:321)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)

at java.lang.ClassLoader.loadClass(ClassLoader.java:266)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:186)

at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)

... 34 more


I have checked the assembly jar on the workers with `jar -tf`,

'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there.

Also, to test whether the executor can load this class,  I have tried

'System.out.println(Class.forName(com.aries.hawkeyes.VtrRecordProtos$VtrRecord))'

in my application and

'Thread.currentThread.getContextClassLoader.loadClass(com.aries.hawkeyes.VtrRecordProtos$VtrRecord)'

in 

Re: Running actions in loops

2014-03-07 Thread Mayur Rustagi
So the whole function closure you want to apply on your RDD needs to be
serializable so that it can be serialized  sent to workers to operate on
RDD. So objects of jodatime cannot be serialized  sent hence jodatime is
out of work. 2 bad answers
1. initialize jodatime for each row  complete work  destroy them, that
way they are only intialized when job is running  need not be sent across.
2. Write your own parser  hope jodatime guys get their act together.

Regards


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
og...@nengoiksvelzud.comwrote:

  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
 that this serialization issue applies to? Only the method making a call
 into / using such a library? The whole class the method using such a
 library belongs to? Sorry if it is a dumb question :)

 Ognen


 On 3/7/14, 1:29 PM, Mayur Rustagi wrote:

 Mostly the job you are executing is not serializable, this typically
 happens when you have a library that is not serializable.. are you using
 any library like jodatime etc ?

  Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 It looks like the problem is in the filter task - is there anything
 special about filter()?

 I have removed the filter line from the loops just to see if things will
 work and they do.

 Anyone has any ideas?

 Thanks!
 Ognen


 On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

 Hello,

 What is the general approach people take when trying to do analysis
 across multiple large files where the data to be extracted from a
 successive file depends on the data extracted from a previous file or set
 of files?

 For example:
 I have the following: a group of HDFS files each 20+GB in size. I need
 to extract event1 on day 1 from first file and extract event2 from all
 remaining files in a period of successive dates, then do a calculation on
 the two events.
 I then need to move on to day2, extract event1 (with certain
 properties), take all following days, extract event2 and run a calculation
 against previous day for all days in period. So on and so on.

 I have verified that the following (very naive approach doesn't work):

 def
 calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
 = {
 val epd = new PipelineDate(end)
 val result = for {
   dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd)
   val f1 = sc.textFile(dt1.toJsonHdfsFileName)
   val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event1).map(line =
 (line.split(,)(2).split(:)(1).replace(\,),0)).cache
   val c = e1.count.toDouble

   val intres = for {
 dt2 - PipelineDate.getPeriod(dt1+1,epd)
 val f2 = sc.textFile(dt2.toJsonHdfsFileName)
 val e2 =
 f2.filter(_.split(,)(0).split(:)(1).replace(\,) ==
 event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1))
 val e1e2 = e1.union(e2)
 val r = e1e2.groupByKey().filter(e = e._2.length  1 
 e._2.filter(_==0).length0).count.toDouble
   } yield (c/r) // get the retention rate
 } yield (dt1.toString-intres)
 Map(result:_*)
   }

 I am getting the following errors:
 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
 CountActor.scala:33
 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
 CountActor.scala:33) with 140 output partitions (allowLocal=false)
 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
 CountActor.scala:33)
 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 map at CountActor.scala:32), which has no missing parents
 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
 CountActor.scala:33
 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
 serializable: java.io.NotSerializableException:
 com.github.ognenpv.pipeline.CountActor
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at 

Re: Help connecting to the cluster

2014-03-07 Thread Mayur Rustagi
The driver contains the DAG scheduler which manages stages of jobs  needs
to talk back  forth with workers. So you can run Driver on any machine
that can reach master  drivers(even your laptop). But Driver will need to
be reachable to all machines.
I think 0.9.0 added an ability for the driver to embedded in the master, I
am not sure if its general or restricted to Spark Streaming.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Mar 7, 2014 at 12:29 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote:

 Hi Spark users,

 could someone help me out.

 My company has a fully functioning spark cluster with shark running on
 top of it (as part of the same cluster, on the same LAN) . I'm
 interested in running raw spark code against it but am running against
 the following issue -- it seems like the machine hosting the driver
 program needs to be reachable by the worker nodes (in my case the
 workers cannot route to the machine hosting the driver). Below is a
 snippet from my worker log:

 14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting
 to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler
 14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver
 terminated or disconnected! Shutting down.

 Does this sound right -- it's not clear to me why a worker would try
 to establish a connection to the driver -- the driver already
 connected successfully as I see the program listed in the logwhy
 is this connection not sufficient?

 If you use Amazon EC2, can you run the driver from your personal
 machine or do have to install an IDE on one of Amazon machines in
 order to debug code? I am not too excited about the EC2 option as our
 data is proprietary...but if that's the shortest path to success at
 least it would get me started on some toy examples. At the moment I'm
 not sure what my options are, other than running a VM cluster or EC2

 Any help/insight would be greatly appreciated.



Class not found in Kafka-Stream due to multi-thread without correct ClassLoader?

2014-03-07 Thread Aries Kong
Hi,

I'm trying to run a kafka-stream and get a strange exception. The

streaming is created by following code:


val lines = KafkaUtils.createStream[String, VtrRecord,

StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap,

topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2)


'VtrRecord' is generated by protobuf in the same package,

'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord'

as following:


import com.aries.hawkeyes.VtrRecordProtos.VtrRecord

class VtrRecordDeserializer(props: VerifiableProperties = null)

extends kafka.serializer.Decoder[VtrRecord] {

override def fromBytes(bytes : Array[Byte]) : VtrRecord = {

VtrRecord.parseFrom(bytes)

}

}


When the assembly jar(build by maven-shade-plugin)  is submitted to

the Spark cluster, I get the following ClassNotFoundException

exception:


java.lang.RuntimeException: Unable to find proto buffer class

at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:616)

at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)

at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)

at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)

at org.apache.spark.scheduler.Task.run(Task.scala:53)

at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)

at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

Caused by: java.lang.ClassNotFoundException:

com.aries.hawkeyes.VtrRecordProtos$VtrRecord

at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

at java.lang.ClassLoader.loadClass(ClassLoader.java:321)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)

at java.lang.ClassLoader.loadClass(ClassLoader.java:266)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:186)

at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)

... 34 more


I have checked the assembly jar on the workers with `jar -tf`,

'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there.

Also, to test whether the executor can load this class,  I have tried

'System.out.println(Class.forName(com.aries.hawkeyes.VtrRecordProtos$VtrRecord))'

in my application and

'Thread.currentThread.getContextClassLoader.loadClass(com.aries.hawkeyes.VtrRecordProtos$VtrRecord)'

in 

Re: Explain About Logs NetworkWordcount.scala

2014-03-07 Thread Tathagata Das
I am not sure how to debug this without any more information about the
source. Can you monitor on the receiver side that data is being accepted by
the receiver but not reported?

TD


On Wed, Mar 5, 2014 at 7:23 AM, eduardocalfaia e.costaalf...@unibs.itwrote:

 Hi TD,
 I have seen in the web UI the stage number that result has been zero and in
 the field GC Times there is nothing.
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n2306/CaptureStage.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Explain-About-Logs-NetworkWordcount-scala-tp1835p2306.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Running actions in loops

2014-03-07 Thread Nick Pentreath
There is #3 which is use mapPartitions and init one jodatime obj per partition, 
which is less overhead for large objects—
Sent from Mailbox for iPhone

On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 So the whole function closure you want to apply on your RDD needs to be
 serializable so that it can be serialized  sent to workers to operate on
 RDD. So objects of jodatime cannot be serialized  sent hence jodatime is
 out of work. 2 bad answers
 1. initialize jodatime for each row  complete work  destroy them, that
 way they are only intialized when job is running  need not be sent across.
 2. Write your own parser  hope jodatime guys get their act together.
 Regards
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi
 On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
 og...@nengoiksvelzud.comwrote:
  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
 that this serialization issue applies to? Only the method making a call
 into / using such a library? The whole class the method using such a
 library belongs to? Sorry if it is a dumb question :)

 Ognen


 On 3/7/14, 1:29 PM, Mayur Rustagi wrote:

 Mostly the job you are executing is not serializable, this typically
 happens when you have a library that is not serializable.. are you using
 any library like jodatime etc ?

  Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 It looks like the problem is in the filter task - is there anything
 special about filter()?

 I have removed the filter line from the loops just to see if things will
 work and they do.

 Anyone has any ideas?

 Thanks!
 Ognen


 On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

 Hello,

 What is the general approach people take when trying to do analysis
 across multiple large files where the data to be extracted from a
 successive file depends on the data extracted from a previous file or set
 of files?

 For example:
 I have the following: a group of HDFS files each 20+GB in size. I need
 to extract event1 on day 1 from first file and extract event2 from all
 remaining files in a period of successive dates, then do a calculation on
 the two events.
 I then need to move on to day2, extract event1 (with certain
 properties), take all following days, extract event2 and run a calculation
 against previous day for all days in period. So on and so on.

 I have verified that the following (very naive approach doesn't work):

 def
 calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
 = {
 val epd = new PipelineDate(end)
 val result = for {
   dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd)
   val f1 = sc.textFile(dt1.toJsonHdfsFileName)
   val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event1).map(line =
 (line.split(,)(2).split(:)(1).replace(\,),0)).cache
   val c = e1.count.toDouble

   val intres = for {
 dt2 - PipelineDate.getPeriod(dt1+1,epd)
 val f2 = sc.textFile(dt2.toJsonHdfsFileName)
 val e2 =
 f2.filter(_.split(,)(0).split(:)(1).replace(\,) ==
 event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1))
 val e1e2 = e1.union(e2)
 val r = e1e2.groupByKey().filter(e = e._2.length  1 
 e._2.filter(_==0).length0).count.toDouble
   } yield (c/r) // get the retention rate
 } yield (dt1.toString-intres)
 Map(result:_*)
   }

 I am getting the following errors:
 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
 CountActor.scala:33
 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
 CountActor.scala:33) with 140 output partitions (allowLocal=false)
 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
 CountActor.scala:33)
 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 map at CountActor.scala:32), which has no missing parents
 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
 CountActor.scala:33
 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
 serializable: java.io.NotSerializableException:
 com.github.ognenpv.pipeline.CountActor
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at