[ 
https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249582#comment-14249582
 ] 

Nicholas Chammas commented on SPARK-4868:
-----------------------------------------

Replacing {{noop()}} with this

{code}
object Test {
    def noop(a: Any): Any = {
      a
    }
}
{code}

and calling {{liveTweets.map(t => Test.noop(t)).print()}} yields a similar 
stack trace.

Creating the streaming context as follows

{code}
@transient val ssc = new StreamingContext(sc, Seconds(5))
{code}

and trying either the original {{noop()}} or {{Test.noop()}} (with REPL 
restarts in between) yields a slightly more interesting trace:

{code}
scala> liveTweets.map(t => Test.noop(t)).print()  // exception here
org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
        at $iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC.<init>(<console>:34)
        at $iwC.<init>(<console>:36)
        at <init>(<console>:38)
        at .<init>(<console>:42)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
        at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
        at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Object of 
org.apache.spark.streaming.twitter.TwitterInputDStream is being serialized  
possibly as a part of closure of an RDD operation. This is because  the DStream 
object is being referred to from within the closure.  Please rewrite the RDD 
operation inside this DStream to avoid this.  This has been enforced to avoid 
bloating of Spark tasks  with unnecessary objects.
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:416)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
        at 
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:403)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
        ... 43 more
{code}

So the interesting part is this:

{quote}
Caused by: java.io.NotSerializableException: Object of 
org.apache.spark.streaming.twitter.TwitterInputDStream is being serialized  
possibly as a part of closure of an RDD operation. This is because  the DStream 
object is being referred to from within the closure.  Please rewrite the RDD 
operation inside this DStream to avoid this.  This has been enforced to avoid 
bloating of Spark tasks  with unnecessary objects.
{quote}

So somehow the no-op is generating a reference to the enclosing DStream?

> Twitter DStream.map() throws "Task not serializable"
> ----------------------------------------------------
>
>                 Key: SPARK-4868
>                 URL: https://issues.apache.org/jira/browse/SPARK-4868
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, Streaming
>    Affects Versions: 1.1.1
>         Environment: * Spark 1.1.1
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load 
> {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> _(Continuing the discussion [started here on the Spark user 
> list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not 
> understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>       at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
>       at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
>       at $iwC$$iwC$$iwC.<init>(<console>:32)
>       at $iwC$$iwC.<init>(<console>:34)
>       at $iwC.<init>(<console>:36)
>       at <init>(<console>:38)
>       at .<init>(<console>:42)
>       at .<clinit>(<console>)
>       at .<init>(<console>:7)
>       at .<clinit>(<console>)
>       at $print(<console>)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>       at 
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>       at 
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>       at 
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
>       at 
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
>       at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>       at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
>       at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
>       at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
>       at org.apache.spark.repl.Main$.main(Main.scala:31)
>       at org.apache.spark.repl.Main.main(Main.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.streaming.StreamingContext
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>       ... 43 more
> {code}
> What I'm really trying to do is use Spark Streaming via the interactive shell 
> to filter Tweets using a trained KMeans model. I got errors trying that, and 
> I boiled it down to this repro.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to