[ https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249387#comment-14249387 ]
Nicholas Chammas commented on SPARK-4868: ----------------------------------------- cc [~tdas], [~adav] > Twitter DStream.map() throws "Task not serializable" > ---------------------------------------------------- > > Key: SPARK-4868 > URL: https://issues.apache.org/jira/browse/SPARK-4868 > Project: Spark > Issue Type: Bug > Components: Spark Shell, Streaming > Affects Versions: 1.1.1 > Environment: * Spark 1.1.1 > * EC2 cluster with 1 slave spun up using {{spark-ec2}} > * twitter4j 3.0.3 > * {{spark-shell}} called with {{--jars}} argument to load > {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars. > Reporter: Nicholas Chammas > Priority: Minor > > _(Continuing the discussion [started here on the Spark user > list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_ > The following Spark Streaming code throws a serialization exception I do not > understand. > {code} > import twitter4j.auth.{Authorization, OAuthAuthorization} > import twitter4j.conf.ConfigurationBuilder > import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} > import org.apache.spark.streaming.twitter.TwitterUtils > def getAuth(): Option[Authorization] = { > System.setProperty("twitter4j.oauth.consumerKey", "consumerKey") > System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret") > System.setProperty("twitter4j.oauth.accessToken", "accessToken") > System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") > Some(new OAuthAuthorization(new ConfigurationBuilder().build())) > } > def noop(a: Any): Any = { > a > } > val ssc = new StreamingContext(sc, Seconds(5)) > val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth()) > val liveTweets = liveTweetObjects.map(_.getText) > liveTweets.map(t => noop(t)).print() // exception here > ssc.start() > {code} > So before I even start the StreamingContext, I get the following stack trace: > {code} > scala> liveTweets.map(t => noop(t)).print() > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) > at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27) > at $iwC$$iwC$$iwC.<init>(<console>:32) > at $iwC$$iwC.<init>(<console>:34) > at $iwC.<init>(<console>:36) > at <init>(<console>:38) > at .<init>(<console>:42) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633) > at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > ... 43 more > {code} > What I'm really trying to do is use Spark Streaming via the interactive shell > to filter Tweets using a trained KMeans model. I got errors trying that, and > I boiled it down to this repro. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org