I must be really dense! :)
Here is the most simplified version of the code, I removed a bunch of
stuff and hard-coded the "event" and "Sign Up" lines.
def jsonMatches(line:String):Boolean = {
val jLine = parse(line)
// extract the event: from the line
val e = jLine \ "event"
if (e == JNothing) false
else {
if (e.values != "Sign Up") // there is no such event as we
passed in
false
else
true
}
}
val f = sc.textFile("hfds://10.10.0.198:54310/test/2013-12-01.json")
val ev1rdd = f.filter(jsonMatches).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),0)).distinct.cache
val ev1ct = ev1rdd.count.toDouble
The code breaks with below output. I am not sure how much simpler the
code can be :(
14/03/13 16:40:38 INFO MemoryStore: Block broadcast_0 stored as values
to memory (estimated size 143.1 KB, free 682.5 MB)
14/03/13 16:40:39 INFO FileInputFormat: Total input paths to process : 1
14/03/13 16:40:40 INFO SparkContext: Starting job: count at
CountActor.scala:119
14/03/13 16:40:40 INFO DAGScheduler: Registering RDD 5 (distinct at
CountActor.scala:118)
14/03/13 16:40:40 INFO DAGScheduler: Got job 0 (count at
CountActor.scala:119) with 140 output partitions (allowLocal=false)
14/03/13 16:40:40 INFO DAGScheduler: Final stage: Stage 0 (count at
CountActor.scala:119)
14/03/13 16:40:40 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/03/13 16:40:40 INFO DAGScheduler: Missing parents: List(Stage 1)
14/03/13 16:40:40 INFO DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[5] at distinct at CountActor.scala:118), which has no
missing parents
14/03/13 16:40:40 INFO DAGScheduler: Failed to run count at
CountActor.scala:119
14/03/13 16:40:40 ERROR OneForOneStrategy: Job aborted: Task not
serializable: java.io.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
On 3/13/14, 11:26 AM, Paul Brown wrote:
Well, the question is how you're referencing it. If you reference it
in a static fashion (function on an object, Scala-wise), then that's
dereferenced on the worker side. If you reference it in a way that
refers to something on the driver side, serializing the block will
attempt to serialize the non-serializable bits and you'll get the
exception you're seeing now.
—
p...@mult.ifario.us <mailto:p...@mult.ifario.us> | Multifarious, Inc. |
http://mult.ifario.us/
On Thu, Mar 13, 2014 at 9:20 AM, Ognen Duzlevski
<og...@plainvanillagames.com <mailto:og...@plainvanillagames.com>> wrote:
Hmm.
The whole thing is packaged in a .jar file and I execute .addJar
on the SparkContext. My expectation is that the whole jar together
with that function is available on every worker automatically. Is
that not a valid expectation?
Ognen
On 3/13/14, 11:09 AM, Paul Brown wrote:
It's trying to send You just need to have the jsonMatches
function available on the worker side of the interaction rather
than on the driver side, e.g., put it on an object
CodeThatIsRemote that gets shipped with the JARs and then
filter(CodeThatIsRemote.jsonMatches) and you should be off to the
races.
—
p...@mult.ifario.us <mailto:p...@mult.ifario.us> | Multifarious,
Inc. | http://mult.ifario.us/
On Thu, Mar 13, 2014 at 8:04 AM, Ognen Duzlevski
<og...@nengoiksvelzud.com <mailto:og...@nengoiksvelzud.com>> wrote:
Hello,
Is there anything special about calling functions that parse
json lines from filter?
I have code that looks like this:
jsonMatches(line:String):Boolean = {
take a line in json format
val jline=parse(line)
val je = jline \ "event"
if (je != JNothing && je.values.toString ==
userSuppliedEventSelector) {
val jp = jline \ "properties"
val lineVals = for {
p <- userProps
val lp = jp \ p
if (lp != JNothing && lp.values.toString ==
userSuppliedValueSelector)
} yield lp.toValues.toString
if (userSuppliedVals.toSet == lineVals)
return true
}
false
}
userSuppliedEventSelector and userSuppliedValueSelector are
Strings passed into the function that jsonMatches() is
embedded within as is the following code.
I then do something like:
val f = sc.textFile(hdfs:\\somefile)
val ev1rdd = f.filter(jsonMatches).map(something =>
somethingElse)
I am getting the following
14/03/13 14:55:21 ERROR OneForOneStrategy: Job aborted: Task
not serializable: java.io
<http://java.io>.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
I suspect that the json parsing library is not serializable?
Any ideas? What do people do to achieve what I want to do?
Thanks!
Ognen
--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski
--
Some people, when confronted with a problem, think "I know, I'll use regular
expressions." Now they have two problems.
-- Jamie Zawinski