Well, the question is how you're referencing it. If you reference it in a static fashion (function on an object, Scala-wise), then that's dereferenced on the worker side. If you reference it in a way that refers to something on the driver side, serializing the block will attempt to serialize the non-serializable bits and you'll get the exception you're seeing now.
— p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Mar 13, 2014 at 9:20 AM, Ognen Duzlevski < og...@plainvanillagames.com> wrote: > Hmm. > > The whole thing is packaged in a .jar file and I execute .addJar on the > SparkContext. My expectation is that the whole jar together with that > function is available on every worker automatically. Is that not a valid > expectation? > > Ognen > > > On 3/13/14, 11:09 AM, Paul Brown wrote: > > > It's trying to send You just need to have the jsonMatches function > available on the worker side of the interaction rather than on the driver > side, e.g., put it on an object CodeThatIsRemote that gets shipped with the > JARs and then filter(CodeThatIsRemote.jsonMatches) and you should be off to > the races. > > — > p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ > > > On Thu, Mar 13, 2014 at 8:04 AM, Ognen Duzlevski <og...@nengoiksvelzud.com > > wrote: > >> Hello, >> >> Is there anything special about calling functions that parse json lines >> from filter? >> >> I have code that looks like this: >> >> jsonMatches(line:String):Boolean = { >> take a line in json format >> val jline=parse(line) >> val je = jline \ "event" >> if (je != JNothing && je.values.toString == userSuppliedEventSelector) { >> val jp = jline \ "properties" >> val lineVals = for { >> p <- userProps >> val lp = jp \ p >> if (lp != JNothing && lp.values.toString == >> userSuppliedValueSelector) >> } yield lp.toValues.toString >> if (userSuppliedVals.toSet == lineVals) >> return true >> } >> false >> } >> >> userSuppliedEventSelector and userSuppliedValueSelector are Strings >> passed into the function that jsonMatches() is embedded within as is the >> following code. >> >> I then do something like: >> >> val f = sc.textFile(hdfs:\\somefile) >> val ev1rdd = f.filter(jsonMatches).map(something => somethingElse) >> >> I am getting the following >> 14/03/13 14:55:21 ERROR OneForOneStrategy: Job aborted: Task not >> serializable: java.io.NotSerializableException: >> com.github.ognenpv.pipeline.CountActor >> >> I suspect that the json parsing library is not serializable? >> >> Any ideas? What do people do to achieve what I want to do? >> >> Thanks! >> Ognen >> > > > -- > Some people, when confronted with a problem, think "I know, I'll use regular > expressions." Now they have two problems. > -- Jamie Zawinski > >