It looks like the problem is in the filter task - is there anything special about filter()?

I have removed the filter line from the loops just to see if things will work and they do.

Anyone has any ideas?

Thanks!
Ognen

On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
Hello,

What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = {
    val epd = new PipelineDate(end)
    val result = for {
      dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
      val f1 = sc.textFile(dt1.toJsonHdfsFileName)
val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
      val c = e1.count.toDouble

      val intres = for {
        dt2 <- PipelineDate.getPeriod(dt1+1,epd)
        val f2 = sc.textFile(dt2.toJsonHdfsFileName)
val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
        val e1e2 = e1.union(e2)
val r = e1e2.groupByKey().filter(e => e._2.length > 1 && e._2.filter(_==0).length>0).count.toDouble
      } yield (c/r) // get the retention rate
    } yield (dt1.toString->intres)
    Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;)
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski

Reply via email to