I even tried this:

def jsonMatches(line:String):Boolean = true

It is still failing with the same error.

Ognen

On 3/13/14, 11:45 AM, Ognen Duzlevski wrote:
I must be really dense! :)

Here is the most simplified version of the code, I removed a bunch of stuff and hard-coded the "event" and "Sign Up" lines.

def jsonMatches(line:String):Boolean = {
      val jLine = parse(line)
      // extract the event: from the line
      val e = jLine \ "event"
      if (e == JNothing) false
      else {
if (e.values != "Sign Up") // there is no such event as we passed in
          false
        else
          true
      }
    }

    val f = sc.textFile("hfds://10.10.0.198:54310/test/2013-12-01.json")
val ev1rdd = f.filter(jsonMatches).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).distinct.cache
    val ev1ct = ev1rdd.count.toDouble

The code breaks with below output. I am not sure how much simpler the code can be :(

14/03/13 16:40:38 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 682.5 MB)
14/03/13 16:40:39 INFO FileInputFormat: Total input paths to process : 1
14/03/13 16:40:40 INFO SparkContext: Starting job: count at CountActor.scala:119 14/03/13 16:40:40 INFO DAGScheduler: Registering RDD 5 (distinct at CountActor.scala:118) 14/03/13 16:40:40 INFO DAGScheduler: Got job 0 (count at CountActor.scala:119) with 140 output partitions (allowLocal=false) 14/03/13 16:40:40 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:119)
14/03/13 16:40:40 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/03/13 16:40:40 INFO DAGScheduler: Missing parents: List(Stage 1)
14/03/13 16:40:40 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at distinct at CountActor.scala:118), which has no missing parents 14/03/13 16:40:40 INFO DAGScheduler: Failed to run count at CountActor.scala:119 14/03/13 16:40:40 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
        at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


On 3/13/14, 11:26 AM, Paul Brown wrote:

Well, the question is how you're referencing it. If you reference it in a static fashion (function on an object, Scala-wise), then that's dereferenced on the worker side. If you reference it in a way that refers to something on the driver side, serializing the block will attempt to serialize the non-serializable bits and you'll get the exception you're seeing now.
p...@mult.ifario.us <mailto:p...@mult.ifario.us> | Multifarious, Inc. | http://mult.ifario.us/


On Thu, Mar 13, 2014 at 9:20 AM, Ognen Duzlevski <og...@plainvanillagames.com <mailto:og...@plainvanillagames.com>> wrote:

    Hmm.

    The whole thing is packaged in a .jar file and I execute .addJar
    on the SparkContext. My expectation is that the whole jar
    together with that function is available on every worker
    automatically. Is that not a valid expectation?

    Ognen


    On 3/13/14, 11:09 AM, Paul Brown wrote:

    It's trying to send You just need to have the jsonMatches
    function available on the worker side of the interaction rather
    than on the driver side, e.g., put it on an object
    CodeThatIsRemote that gets shipped with the JARs and then
    filter(CodeThatIsRemote.jsonMatches) and you should be off to
    the races.

    —
    p...@mult.ifario.us <mailto:p...@mult.ifario.us> | Multifarious,
    Inc. | http://mult.ifario.us/


    On Thu, Mar 13, 2014 at 8:04 AM, Ognen Duzlevski
    <og...@nengoiksvelzud.com <mailto:og...@nengoiksvelzud.com>> wrote:

        Hello,

        Is there anything special about calling functions that parse
        json lines from filter?

        I have code that looks like this:

        jsonMatches(line:String):Boolean = {
          take a line in json format
          val jline=parse(line)
          val je = jline \ "event"
          if (je != JNothing && je.values.toString ==
        userSuppliedEventSelector) {
             val jp = jline \ "properties"
             val lineVals = for {
               p <- userProps
               val lp = jp \ p
               if (lp != JNothing && lp.values.toString ==
        userSuppliedValueSelector)
             } yield lp.toValues.toString
             if (userSuppliedVals.toSet == lineVals)
               return true
           }
           false
        }

        userSuppliedEventSelector and userSuppliedValueSelector are
        Strings passed into the function that jsonMatches() is
        embedded within as is the following code.

        I then do something like:

        val f = sc.textFile(hdfs:\\somefile)
        val ev1rdd = f.filter(jsonMatches).map(something =>
        somethingElse)

        I am getting the following
        14/03/13 14:55:21 ERROR OneForOneStrategy: Job aborted: Task
        not serializable: java.io
        <http://java.io>.NotSerializableException:
        com.github.ognenpv.pipeline.CountActor

        I suspect that the json parsing library is not serializable?

        Any ideas? What do people do to achieve what I want to do?

        Thanks!
        Ognen



-- Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
    -- Jamie Zawinski



--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski

--
Some people, when confronted with a problem, think "I know, I'll use regular 
expressions." Now they have two problems.
-- Jamie Zawinski

Reply via email to