There is #3 which is use mapPartitions and init one jodatime obj per partition, 
which is less overhead for large objects—
Sent from Mailbox for iPhone

On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi <mayur.rust...@gmail.com>
wrote:

> So the whole function closure you want to apply on your RDD needs to be
> serializable so that it can be "serialized" & sent to workers to operate on
> RDD. So objects of jodatime cannot be serialized & sent hence jodatime is
> out of work. 2 bad answers
> 1. initialize jodatime for each row & complete work & destroy them, that
> way they are only intialized when job is running & need not be sent across.
> 2. Write your own parser & hope jodatime guys get their act together.
> Regards
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
> On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
> <og...@nengoiksvelzud.com>wrote:
>>  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
>> that this serialization issue applies to? Only the method making a call
>> into / using such a library? The whole class the method using such a
>> library belongs to? Sorry if it is a dumb question :)
>>
>> Ognen
>>
>>
>> On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
>>
>> Mostly the job you are executing is not serializable, this typically
>> happens when you have a library that is not serializable.. are you using
>> any library like jodatime etc ?
>>
>>  Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <
>> og...@plainvanillagames.com> wrote:
>>
>>> It looks like the problem is in the filter task - is there anything
>>> special about filter()?
>>>
>>> I have removed the filter line from the loops just to see if things will
>>> work and they do.
>>>
>>> Anyone has any ideas?
>>>
>>> Thanks!
>>> Ognen
>>>
>>>
>>> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>>>
>>>> Hello,
>>>>
>>>> What is the general approach people take when trying to do analysis
>>>> across multiple large files where the data to be extracted from a
>>>> successive file depends on the data extracted from a previous file or set
>>>> of files?
>>>>
>>>> For example:
>>>> I have the following: a group of HDFS files each 20+GB in size. I need
>>>> to extract event1 on day 1 from first file and extract event2 from all
>>>> remaining files in a period of successive dates, then do a calculation on
>>>> the two events.
>>>> I then need to move on to day2, extract event1 (with certain
>>>> properties), take all following days, extract event2 and run a calculation
>>>> against previous day for all days in period. So on and so on.
>>>>
>>>> I have verified that the following (very naive approach doesn't work):
>>>>
>>>> def
>>>> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>>>> = {
>>>>     val epd = new PipelineDate(end)
>>>>     val result = for {
>>>>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>>>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>>>       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>>>> == event1).map(line =>
>>>> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>>>>       val c = e1.count.toDouble
>>>>
>>>>       val intres = for {
>>>>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>>>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>>>         val e2 =
>>>> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>>>> event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
>>>>         val e1e2 = e1.union(e2)
>>>>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>>>> e._2.filter(_==0).length>0).count.toDouble
>>>>       } yield (c/r) // get the retention rate
>>>>     } yield (dt1.toString->intres)
>>>>     Map(result:_*)
>>>>   }
>>>>
>>>> I am getting the following errors:
>>>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>>>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>>>> CountActor.scala:33)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>>>> map at CountActor.scala:32), which has no missing parents
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>>>> serializable: java.io.NotSerializableException:
>>>> com.github.ognenpv.pipeline.CountActor
>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>>     at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>     at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> I should mention that this code is fired off from an Akka actor (which
>>>> is controlled by a Scalatra servlet).
>>>>
>>>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>>>> principles in general, it is fair to say that at this point I am still
>>>> thinking from a point of view of an imperative programmer trying to fit a
>>>> square peg through a round hole ;)
>>>> Ognen
>>>>
>>>
>>>   --
>>> Some people, when confronted with a problem, think "I know, I'll use
>>> regular expressions." Now they have two problems.
>>> -- Jamie Zawinski
>>>
>>>
>>
>>

Reply via email to