[ 
https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568025#comment-14568025
 ] 

Tathagata Das commented on SPARK-4868:
--------------------------------------

There has been a few fixes regarding closure cleaner in Spark 1.4. Could you 
check it out.

> Twitter DStream.map() throws "Task not serializable"
> ----------------------------------------------------
>
>                 Key: SPARK-4868
>                 URL: https://issues.apache.org/jira/browse/SPARK-4868
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, Streaming
>    Affects Versions: 1.1.1, 1.2.0
>         Environment: * Spark 1.1.1 or 1.2.0
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load 
> {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> _(Continuing the discussion [started here on the Spark user 
> list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not 
> understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>       at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
>       at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
>       at $iwC$$iwC$$iwC.<init>(<console>:32)
>       at $iwC$$iwC.<init>(<console>:34)
>       at $iwC.<init>(<console>:36)
>       at <init>(<console>:38)
>       at .<init>(<console>:42)
>       at .<clinit>(<console>)
>       at .<init>(<console>:7)
>       at .<clinit>(<console>)
>       at $print(<console>)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>       at 
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>       at 
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>       at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>       at 
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
>       at 
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
>       at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>       at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
>       at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
>       at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>       at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
>       at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
>       at org.apache.spark.repl.Main$.main(Main.scala:31)
>       at org.apache.spark.repl.Main.main(Main.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.streaming.StreamingContext
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>       ... 43 more
> {code}
> What I'm really trying to do is use Spark Streaming via the interactive shell 
> to filter Tweets using a trained KMeans model. I got errors trying that, and 
> I boiled it down to this repro.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to