Thanks TD, Amit.

I think I figured out where the problem is through the process of commenting 
out individual lines of code one at a time :(

Can either of you help me find the right solution? I tried creating the 
SparkContext outside the foreachRDD but that didn’t help.

I have an object (let’s say A) that is passed a SparkContext like this:

object A {
  def func1(sc: SparkContext) {
    //Do something with sc
}

In my main object that creates the StreamingContext, I call object A’s func1 
method like this:

val ssc = new StreamingContext(spark, Seconds(batchTime))

ssc.checkpoint(checkPointDir)

val messageStream = KafkaConsumer.messageStream(ssc)

messageStream.foreachRDD(rdd => {
   A.func1(ssc.sparkContext)
}

Seems like the call A.func1(ssc.sparkContext) above is the cause of the 
exception.

Thanks,
Mahesh

From: Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>>
Date: Thursday, August 7, 2014 at 1:11 PM
To: amit <amit.codenam...@gmail.com<mailto:amit.codenam...@gmail.com>>
Cc: "u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>" 
<u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>>
Subject: Re: Spark 1.0.1 NotSerialized exception (a bit of a head scratcher)

>From the extended info, I see that you have a function called 
>createStreamingContext() in your code. Somehow that is getting referenced in 
>in the foreach function. Is the whole foreachRDD code inside the 
>createStreamingContext() function? Did you try marking the ssc field as 
>transient?

Here is a significantly different approach. Put the whole function to apply on 
each item in an object.

object MyFunctions {
  def processItem(enable: Boolean)(item: (Int, (Long, Long)) = {
     val (key, (oc, dc)) = item
            DebugLogger.log("Original event count = " + oc)
            DebugLogger.log("Found "+(oc-dc)+" duplicate(s) in "+oc+" events")
            if (enableOpStat) {
              try {
                val statBody = Array(("batchCount", oc.toString()),
                  ("duplicateCount", (oc-dc).toString()))
                OperationalStatProducer.produce(statBody)
              } catch { case e: Exception => DebugLogger.report(e) }
            }
          }
  }
}



And then use that

msgCount.join(ddCount).foreachRDD((rdd: RDD[(Int, (Long, Long))]) => {
        val enable = enableOptStat
        rdd.foreach(MyFunction.processItem(enable) _ )
}



On Thu, Aug 7, 2014 at 11:52 AM, amit 
<amit.codenam...@gmail.com<mailto:amit.codenam...@gmail.com>> wrote:
There is one more configuration option called spark.closure.serializer that
can be used to specify serializer for closures.

Maybe in the the class you have Streaming Context as a field, so when spark
tries to serialize the whole class it uses the spark.closure.serializer to
serialize even the streaming context. Classes like StreamingContext may not
work if serialized and deserialized in a different JVM(?).

So I see two solutions one is to somehow avoid serializing StreamingContext,
other is to override the default serialization method to serialize only the
params required by streaming context and recreate it in the serialization
step from the params



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-NotSerialized-exception-a-bit-of-a-head-scratcher-tp11666p11703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



________________________________
This E-mail and any of its attachments may contain Time Warner Cable 
proprietary information, which is privileged, confidential, or subject to 
copyright belonging to Time Warner Cable. This E-mail is intended solely for 
the use of the individual or entity to which it is addressed. If you are not 
the intended recipient of this E-mail, you are hereby notified that any 
dissemination, distribution, copying, or action taken in relation to the 
contents of and attachments to this E-mail is strictly prohibited and may be 
unlawful. If you have received this E-mail in error, please notify the sender 
immediately and permanently delete the original and any copy of this E-mail and 
any printout.

Reply via email to