Yep, here goes!

Here are my environment vitals:

   - Spark 1.0.0
   - EC2 cluster with 1 slave spun up using spark-ec2
   - twitter4j 3.0.3
   - spark-shell called with --jars argument to load
   spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
   jars.

Now, while I’m in the Spark shell, I enter the following:

import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
def getAuth(): Option[Authorization] = {

  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
  System.setProperty("twitter4j.oauth.accessToken", "accessToken")
  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

  Some(new OAuthAuthorization(new ConfigurationBuilder().build()))

}
def noop(a: Any): Any = {
  a
}
val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)

liveTweets.map(t => noop(t)).print()

ssc.start()

So basically, I’m just printing Tweets as-is, but first I’m mapping them to
themselves via noop(). The Tweets will start to flow just fine for a minute
or so, and then, this:

14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job
1406243610000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
    at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
    at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
    at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The time-to-first-error is variable.

This is the simplest repro I can show at this time. Doing more complex
things with liveTweets that involve a KMeansModel, for example, will be
interrupted quicker by this java.io.NotSerializableException. I don’t know
if the root cause is the same, but the error certainly is.

By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error,
but I can’t dig deeper to make sure this is really resolved (e.g. by trying
more complex things that need data) due to SPARK-2471
<https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue
has been resolved, so I’ll try this whole process again using the latest
from master and see how it goes.

Nick


On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

I am very curious though. Can you post a concise code example which we can
> run to reproduce this problem?
>
> TD
>
>
> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I am not entire sure off the top of my head. But a possible (usually
>> works) workaround is to define the function as a val instead of a def. For
>> example
>>
>> def func(i: Int): Boolean = { true }
>>
>> can be written as
>>
>> val func = (i: Int) => { true }
>>
>> Hope this helps for now.
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Hey Diana,
>>>
>>> Did you ever figure this out?
>>>
>>> I’m running into the same exception, except in my case the function I’m
>>> calling is a KMeans model.predict().
>>>
>>> In regular Spark it works, and Spark Streaming without the call to
>>> model.predict() also works, but when put together I get this
>>> serialization exception. I’m on 1.0.0.
>>>
>>> Nick
>>> ​
>>>
>>>
>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dcarr...@cloudera.com>
>>> wrote:
>>>
>>>> Hey all, trying to set up a pretty simple streaming app and getting
>>>> some weird behavior.
>>>>
>>>>  First, a non-streaming job that works fine:  I'm trying to pull out
>>>> lines of a log file that match a regex, for which I've set up a function:
>>>>
>>>> def getRequestDoc(s: String):
>>>>     String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull }
>>>> logs=sc.textFile(logfiles)
>>>> logs.map(getRequestDoc).take(10)
>>>>
>>>> That works, but I want to run that on the same data, but streaming, so
>>>> I tried this:
>>>>
>>>> val logs = ssc.socketTextStream("localhost",4444)
>>>> logs.map(getRequestDoc).print()
>>>> ssc.start()
>>>>
>>>> From this code, I get:
>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job
>>>> 1399545128000 ms.0
>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>> java.io.NotSerializableException:
>>>> org.apache.spark.streaming.StreamingContext
>>>>
>>>>
>>>> But if I do the map function inline instead of calling a separate
>>>> function, it works:
>>>>
>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print()
>>>>
>>>> So why is it able to serialize my little function in regular spark, but
>>>> not in streaming?
>>>>
>>>> Thanks,
>>>> Diana
>>>>
>>>>
>>>>
>>>
>>
>  ​

Reply via email to