NotSerializableException with Trait
Hello, I have a few spark jobs that are doing the same aggregations. I want to factorize the aggregation logic. For that I want to use a Trait. When I run this job extending my Trait (over yarn, in client mode), I get a NotSerializableException (in attachment). If I change my Trait to an Object, the job runs fine and I don't have a NotSerializableException. Could you explain me why ? I don't understand this behavior Thnaks Jean -- object SparkJob extends App { val conf = new SparkConf() val sparkSession: SparkSession = SparkSession.builder() .appName("aggregateAdTechImpressions") .config(conf) .enableHiveSupport() .getOrCreate() ... val impressionsAdtechDF = MyUtil.prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp") val impressionsAggregationDF: DataFrame = MyUtil.aggregateImpressions(impressionsAdtechDF) ... } object MyUtil { private def parseTs(ts: Int): Int = { val tsMilli: Long = ts.toLong * 1000L val date: Date = new Date(tsMilli) val dateFormat = new SimpleDateFormat("MMdd") val dateStr = dateFormat.format(date) if (dateStr == null) 19000101 else dateStr.toInt } private def udfParseTs: UserDefinedFunction = udf(parseTs _) def prepareAggregationDataFrame(rawDF: DataFrame, timestampColumnName: String): DataFrame = { rawDF .withColumn("original_placement_id", col("placementid")) .withColumn("date", udfParseTs(col(timestampColumnName))) .withColumn("placement_id", col("placementid") cast StringType) .withColumnRenamed("campaignid", "campaign_id") .withColumnRenamed("placementSizeTypeId", "size_id") .drop("placementid") .drop(timestampColumnName) } def aggregateImpressions(inputDF: DataFrame): DataFrame = { inputDF.groupBy( col("date"), col("campaign_id"), col("original_placement_id"), col("placement_id"), col("size_id")) .agg(count(lit(1)).alias("cnt")) .withColumn("type", lit(1)) .withColumn("revenue_chf", lit(0) cast DoubleType) .withColumn("revenue_eur", lit(0) cast DoubleType) .withColumn("source", lit(0)) // 0 for AdTech } } object SparkJob2 extends App with MyTrait { val conf = new SparkConf() val sparkSession: SparkSession = SparkSession.builder() .appName("aggregateAdTechImpressions") .config(conf) .enableHiveSupport() .getOrCreate() ... val impressionsAdtechDF = prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp") val impressionsAggregationDF: DataFrame = aggregateImpressions(impressionsAdtechDF) ... } trait MyTrait { private def parseTs(ts: Int): Int = { val tsMilli: Long = ts.toLong * 1000L val date: Date = new Date(tsMilli) val dateFormat = new SimpleDateFormat("MMdd") val dateStr = dateFormat.format(date) if (dateStr == null) 19000101 else dateStr.toInt } private def udfParseTs: UserDefinedFunction = udf(parseTs _) def prepareAggregationDataFrame(rawDF: DataFrame, timestampColumnName: String): DataFrame = { rawDF .withColumn("original_placement_id", col("placementid")) .withColumn("date", udfParseTs(col(timestampColumnName))) .withColumn("placement_id", col("placementid") cast StringType) .withColumnRenamed("campaignid", "campaign_id") .withColumnRenamed("placementSizeTypeId", "size_id") .drop("placementid") .drop(timestampColumnName) } def aggregateImpressions(inputDF: DataFrame): DataFrame = { inputDF.groupBy( col("date"), col("campaign_id"), col("original_placement_id"), col("placement_id"), col("size_id")) .agg(count(lit(1)).alias("cnt")) .withColumn("type", lit(1)) .withColumn("revenue_chf", lit(0) cast DoubleType) .withColumn("revenue_eur", lit(0) cast DoubleType) .withColumn("source", lit(0)) // 0 for AdTech } } spark-NotSerializableException.log Description: Binary data - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming - NotSerializableException: Methods & Closures:
You could also try to put transform in a companion object. On Fri, 8 Apr 2016 16:48 mpawashe [via Apache Spark User List], < ml-node+s1001560n26718...@n3.nabble.com> wrote: > The class declaration is already marked Serializable ("with Serializable") > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26718.html > To unsubscribe from Spark Streaming - NotSerializableException: Methods & > Closures:, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=26672=amFtYm9ydGFAZ21haWwuY29tfDI2NjcyfC00Mjk2ODU1NTM=> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26723.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming - NotSerializableException: Methods & Closures:
The class declaration is already marked Serializable ("with Serializable") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26718.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - NotSerializableException: Methods & Closures:
you can declare you class serializable, as spark would want to serialise the whole class. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672p26689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - NotSerializableException: Methods & Closures:
Hi. I am using 2.10.4 for Scala. 1.6.0 for Spark related dependencies. I am also using spark-streaming-kafka and including kafka (0.8.1.1) which apparently is needed for deserializers. > On Apr 4, 2016, at 6:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > bq. I'm on version 2.10 for spark > > The above is Scala version. > Can you give us the Spark version ? > > Thanks > >> On Mon, Apr 4, 2016 at 2:36 PM, mpawashe <mpawa...@gmail.com> wrote: >> Hi all, >> >> I am using Spark Streaming API (I'm on version 2.10 for spark and >> streaming), and I am running into a function serialization issue that I do >> not run into when using Spark in batch (non-streaming) mode. >> >> If I wrote code like this: >> >> def run(): Unit = { >> val newStream = stream.map(x => { x + " foo " }) >> // ... >> } >> >> everything works fine.. But if I try it like this: >> >> def transform(x: String): String = { x + " foo " } >> >> def run(): Unit = { >> val newStream = stream.map(transform) >> // ... >> } >> >> ..the program fails being unable to serialize the closure (which when >> passing a method to a function that expects a closure, it should be >> auto-converted to my understanding). >> >> However it works fine if I declare a closure inside run() and use that like >> so: >> >> val transform = (x: String) => { x + " foo " } >> >> If it's declared outside of run(), however, it will also crash. >> >> This is an example stack trace of the error I'm running into. This can be a >> hassle to debug so I hope I wouldn't have to get around this by having to >> use a local closure/function every time. Thanks for any help in advance. >> >> org.apache.spark.SparkException: Task not serializable >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) >> at >> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) >> at >> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266) >> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) >> at com.my.cool.app.MyClass.run(MyClass.scala:90) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) >> at java.lang.reflect.Method.invoke(Unknown Source) >> at >> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) >> at >> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) >> Caused by: java.io.NotSerializableException: Graph is unexpectedly null when >> DStream is being serialized. >> Serialization stack: >> >> at >> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) >> ... 20 more >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Re: Spark Streaming - NotSerializableException: Methods & Closures:
bq. I'm on version 2.10 for spark The above is Scala version. Can you give us the Spark version ? Thanks On Mon, Apr 4, 2016 at 2:36 PM, mpawashe <mpawa...@gmail.com> wrote: > Hi all, > > I am using Spark Streaming API (I'm on version 2.10 for spark and > streaming), and I am running into a function serialization issue that I do > not run into when using Spark in batch (non-streaming) mode. > > If I wrote code like this: > > def run(): Unit = { > val newStream = stream.map(x => { x + " foo " }) > // ... > } > > everything works fine.. But if I try it like this: > > def transform(x: String): String = { x + " foo " } > > def run(): Unit = { > val newStream = stream.map(transform) > // ... > } > > ..the program fails being unable to serialize the closure (which when > passing a method to a function that expects a closure, it should be > auto-converted to my understanding). > > However it works fine if I declare a closure inside run() and use that like > so: > > val transform = (x: String) => { x + " foo " } > > If it's declared outside of run(), however, it will also crash. > > This is an example stack trace of the error I'm running into. This can be a > hassle to debug so I hope I wouldn't have to get around this by having to > use a local closure/function every time. Thanks for any help in advance. > > org.apache.spark.SparkException: Task not serializable > at > > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) > at > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) > at > > org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266) > at > org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) > at com.my.cool.app.MyClass.run(MyClass.scala:90) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > at java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) > at > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) > Caused by: java.io.NotSerializableException: Graph is unexpectedly null > when > DStream is being serialized. > Serialization stack: > > at > > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > ... 20 more > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Spark Streaming - NotSerializableException: Methods & Closures:
Hi all, I am using Spark Streaming API (I'm on version 2.10 for spark and streaming), and I am running into a function serialization issue that I do not run into when using Spark in batch (non-streaming) mode. If I wrote code like this: def run(): Unit = { val newStream = stream.map(x => { x + " foo " }) // ... } everything works fine.. But if I try it like this: def transform(x: String): String = { x + " foo " } def run(): Unit = { val newStream = stream.map(transform) // ... } ..the program fails being unable to serialize the closure (which when passing a method to a function that expects a closure, it should be auto-converted to my understanding). However it works fine if I declare a closure inside run() and use that like so: val transform = (x: String) => { x + " foo " } If it's declared outside of run(), however, it will also crash. This is an example stack trace of the error I'm running into. This can be a hassle to debug so I hope I wouldn't have to get around this by having to use a local closure/function every time. Thanks for any help in advance. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) at com.my.cool.app.MyClass.run(MyClass.scala:90) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack: at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 20 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException
Could you show the full companion object? It looks weird that having `override` in a companion object of a case class. On Tue, Mar 1, 2016 at 11:16 AM, Yuval Itzchakov <yuva...@gmail.com> wrote: > As I said, it is the method which eventually serializes the object. It is > declared inside a companion object of a case class. > > The problem is that Spark will still try to serialize the method, as it > needs to execute on the worker. How will that change the fact that > `EncodeJson[T]` is not serializable? > > > On Tue, Mar 1, 2016, 21:12 Shixiong(Ryan) Zhu <shixi...@databricks.com> > wrote: > >> Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you >> can always put your codes into an method of an "object". Then just call it >> like a Java static method. >> >> On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov <yuva...@gmail.com> >> wrote: >> >>> I have a small snippet of code which relays on argonaut >>> <http://argonaut.io/> for JSON serialization which is ran from a >>> `PairRDDFunctions.mapWithState` once a session is completed. >>> >>> This is the code snippet (not that important): >>> >>> override def sendMessage(pageView: PageView): Unit = { >>> Future { >>> LogHolder.logger.info(s"Sending pageview: ${pageView.id} to >>> automation") >>> try { >>> Http(url) >>> .postData(pageView.asJson.toString) >>> .option(HttpOptions.connTimeout(timeOutMilliseconds)) >>> .asString >>> .throwError >>> } >>> catch { >>> case NonFatal(e) => LogHolder.logger.error("Failed to send >>> pageview", e) >>> } >>> } >>> } >>> >>> argonaut relys on a user implementation of a trait called >>> `EncodeJson[T]`, >>> which tells argonaut how to serialize and deserialize the object. >>> >>> The problem is, that the trait `EncodeJson[T]` is not serializable, thus >>> throwing a NotSerializableException: >>> >>> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2 >>> Serialization stack: >>> - object not serializable (class: argonaut.EncodeJson$$anon$2, >>> value: argonaut.EncodeJson$$anon$2@6415f61e) >>> >>> This is obvious and understandable. >>> >>> The question I have is - What possible ways are there to work around >>> this? >>> I'm currently depended on a third-party library which I can't control of >>> change to implement Serializable in anyway. I've seen this this >>> StackOverflow answer >>> < >>> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou >>> > >>> but couldn't implement any reasonable workaround. >>> >>> Anyone have any ideas? >>> >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>>
Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException
As I said, it is the method which eventually serializes the object. It is declared inside a companion object of a case class. The problem is that Spark will still try to serialize the method, as it needs to execute on the worker. How will that change the fact that `EncodeJson[T]` is not serializable? On Tue, Mar 1, 2016, 21:12 Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you > can always put your codes into an method of an "object". Then just call it > like a Java static method. > > On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov <yuva...@gmail.com> > wrote: > >> I have a small snippet of code which relays on argonaut >> <http://argonaut.io/> for JSON serialization which is ran from a >> `PairRDDFunctions.mapWithState` once a session is completed. >> >> This is the code snippet (not that important): >> >> override def sendMessage(pageView: PageView): Unit = { >> Future { >> LogHolder.logger.info(s"Sending pageview: ${pageView.id} to >> automation") >> try { >> Http(url) >> .postData(pageView.asJson.toString) >> .option(HttpOptions.connTimeout(timeOutMilliseconds)) >> .asString >> .throwError >> } >> catch { >> case NonFatal(e) => LogHolder.logger.error("Failed to send >> pageview", e) >> } >> } >> } >> >> argonaut relys on a user implementation of a trait called `EncodeJson[T]`, >> which tells argonaut how to serialize and deserialize the object. >> >> The problem is, that the trait `EncodeJson[T]` is not serializable, thus >> throwing a NotSerializableException: >> >> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2 >> Serialization stack: >> - object not serializable (class: argonaut.EncodeJson$$anon$2, >> value: argonaut.EncodeJson$$anon$2@6415f61e) >> >> This is obvious and understandable. >> >> The question I have is - What possible ways are there to work around this? >> I'm currently depended on a third-party library which I can't control of >> change to implement Serializable in anyway. I've seen this this >> StackOverflow answer >> < >> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou >> > >> but couldn't implement any reasonable workaround. >> >> Anyone have any ideas? >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException
Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you can always put your codes into an method of an "object". Then just call it like a Java static method. On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov <yuva...@gmail.com> wrote: > I have a small snippet of code which relays on argonaut > <http://argonaut.io/> for JSON serialization which is ran from a > `PairRDDFunctions.mapWithState` once a session is completed. > > This is the code snippet (not that important): > > override def sendMessage(pageView: PageView): Unit = { > Future { > LogHolder.logger.info(s"Sending pageview: ${pageView.id} to > automation") > try { > Http(url) > .postData(pageView.asJson.toString) > .option(HttpOptions.connTimeout(timeOutMilliseconds)) > .asString > .throwError > } > catch { > case NonFatal(e) => LogHolder.logger.error("Failed to send > pageview", e) > } > } > } > > argonaut relys on a user implementation of a trait called `EncodeJson[T]`, > which tells argonaut how to serialize and deserialize the object. > > The problem is, that the trait `EncodeJson[T]` is not serializable, thus > throwing a NotSerializableException: > > Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2 > Serialization stack: > - object not serializable (class: argonaut.EncodeJson$$anon$2, > value: argonaut.EncodeJson$$anon$2@6415f61e) > > This is obvious and understandable. > > The question I have is - What possible ways are there to work around this? > I'm currently depended on a third-party library which I can't control of > change to implement Serializable in anyway. I've seen this this > StackOverflow answer > < > http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou > > > but couldn't implement any reasonable workaround. > > Anyone have any ideas? > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException
I have a small snippet of code which relays on argonaut <http://argonaut.io/> for JSON serialization which is ran from a `PairRDDFunctions.mapWithState` once a session is completed. This is the code snippet (not that important): override def sendMessage(pageView: PageView): Unit = { Future { LogHolder.logger.info(s"Sending pageview: ${pageView.id} to automation") try { Http(url) .postData(pageView.asJson.toString) .option(HttpOptions.connTimeout(timeOutMilliseconds)) .asString .throwError } catch { case NonFatal(e) => LogHolder.logger.error("Failed to send pageview", e) } } } argonaut relys on a user implementation of a trait called `EncodeJson[T]`, which tells argonaut how to serialize and deserialize the object. The problem is, that the trait `EncodeJson[T]` is not serializable, thus throwing a NotSerializableException: Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2 Serialization stack: - object not serializable (class: argonaut.EncodeJson$$anon$2, value: argonaut.EncodeJson$$anon$2@6415f61e) This is obvious and understandable. The question I have is - What possible ways are there to work around this? I'm currently depended on a third-party library which I can't control of change to implement Serializable in anyway. I've seen this this StackOverflow answer <http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou> but couldn't implement any reasonable workaround. Anyone have any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NotSerializableException exception while using TypeTag in Scala 2.10
I also hit this bug, have you resolved this issue? Or could you give some suggestions? 2014-07-28 18:33 GMT+08:00 Aniket Bhatnagar: > I am trying to serialize objects contained in RDDs using runtime > relfection via TypeTag. However, the Spark job keeps > failing java.io.NotSerializableException on an instance of TypeCreator > (auto generated by compiler to enable TypeTags). Is there any workaround > for this without switching to scala 2.11? >
RegressionModelEvaluator (from jpmml) NotSerializableException when instantiated in the driver
I am trying to load a PMML file in a spark job. Instantiate it only once and pass it to the executors. But I get a NotSerializableException for org.xml.sax.helpers.LocatorImpl which is used inside jpmml. I have this class Prediction.java: public class Prediction implements Serializable { private RegressionModelEvaluator rme; public Prediction() throws Exception { InputStream is = .getResourceAsStream("model.pmml"); Source source = ImportFilter.apply(new InputSource(is)); PMML model = JAXBUtil.unmarshalPMML(source); rme = new RegressionModelEvaluator(model); is.close(); } public Map predict(params) { .. return rme.evaluate(params); } } Now I want to instantiate it only once since the "JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine I instantiate inside the map{} So I do this in my driver: Prediction prediction = new Prediction(); JavaRDD result = rdd1 .cartesian(rdd2) .map(t -> {...) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) I am doing this right? -- Thanks, -Utkarsh
NotSerializableException in spark 1.4.0
The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song
Re: NotSerializableException in spark 1.4.0
Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song
Re: NotSerializableException in spark 1.4.0
Your streaming job may have been seemingly running ok, but the DStream checkpointing must have been failing in the background. It would have been visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so that checkpointing failures dont get hidden in the background. The fact that the serialization stack is not being shown correctly, is a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the next couple of days. That should help you to narrow down the culprit preventing serialization. On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song
Re: NotSerializableException in spark 1.4.0
Ah, cool. Thanks. On Wed, Jul 15, 2015 at 5:58 PM, Tathagata Das t...@databricks.com wrote: Spark 1.4.1 just got released! So just download that. Yay for timing. On Wed, Jul 15, 2015 at 2:47 PM, Ted Yu yuzhih...@gmail.com wrote: Should be this one: [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of SerializationDebugger bugs and limitations ... Closes #6625 from tdas/SPARK-7180 and squashes the following commits: On Wed, Jul 15, 2015 at 2:37 PM, Chen Song chen.song...@gmail.com wrote: Thanks Can you point me to the patch to fix the serialization stack? Maybe I can pull it in and rerun my job. Chen On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com wrote: Your streaming job may have been seemingly running ok, but the DStream checkpointing must have been failing in the background. It would have been visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so that checkpointing failures dont get hidden in the background. The fact that the serialization stack is not being shown correctly, is a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the next couple of days. That should help you to narrow down the culprit preventing serialization. On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song -- Chen Song -- Chen Song
Re: NotSerializableException in spark 1.4.0
Thanks Can you point me to the patch to fix the serialization stack? Maybe I can pull it in and rerun my job. Chen On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com wrote: Your streaming job may have been seemingly running ok, but the DStream checkpointing must have been failing in the background. It would have been visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so that checkpointing failures dont get hidden in the background. The fact that the serialization stack is not being shown correctly, is a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the next couple of days. That should help you to narrow down the culprit preventing serialization. On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song -- Chen Song
Re: NotSerializableException in spark 1.4.0
Spark 1.4.1 just got released! So just download that. Yay for timing. On Wed, Jul 15, 2015 at 2:47 PM, Ted Yu yuzhih...@gmail.com wrote: Should be this one: [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of SerializationDebugger bugs and limitations ... Closes #6625 from tdas/SPARK-7180 and squashes the following commits: On Wed, Jul 15, 2015 at 2:37 PM, Chen Song chen.song...@gmail.com wrote: Thanks Can you point me to the patch to fix the serialization stack? Maybe I can pull it in and rerun my job. Chen On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com wrote: Your streaming job may have been seemingly running ok, but the DStream checkpointing must have been failing in the background. It would have been visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so that checkpointing failures dont get hidden in the background. The fact that the serialization stack is not being shown correctly, is a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the next couple of days. That should help you to narrow down the culprit preventing serialization. On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song -- Chen Song
Re: NotSerializableException in spark 1.4.0
Should be this one: [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of SerializationDebugger bugs and limitations ... Closes #6625 from tdas/SPARK-7180 and squashes the following commits: On Wed, Jul 15, 2015 at 2:37 PM, Chen Song chen.song...@gmail.com wrote: Thanks Can you point me to the patch to fix the serialization stack? Maybe I can pull it in and rerun my job. Chen On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com wrote: Your streaming job may have been seemingly running ok, but the DStream checkpointing must have been failing in the background. It would have been visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so that checkpointing failures dont get hidden in the background. The fact that the serialization stack is not being shown correctly, is a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the next couple of days. That should help you to narrow down the culprit preventing serialization. On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show us your function(s) ? Thanks On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com wrote: The streaming job has been running ok in 1.2 and 1.3. After I upgraded to 1.4, I started seeing error as below. It appears that it fails in validate method in StreamingContext. Is there anything changed on 1.4.0 w.r.t DStream checkpointint? Detailed error from driver: 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw exception: *java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable* Serialization stack: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable Serialization stack: at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) -- Chen Song -- Chen Song
NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr
I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get the NotSerializableException on the DefaultHttpClient. Is there a possible fix or workaround? I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0. final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ... JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow, SolrInputDocument() { public SolrInputDocument call(Row r) { return r.toSolrDocument(); } }); solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument() { public void call(IteratorSolrInputDocument solrDocIterator) throws Exception { ListSolrInputDocument batch = new ArrayListSolrInputDocument(); while (solrDocIterator.hasNext()) { SolrInputDocument inputDoc = solrDocIterator.next(); batch.add(inputDoc); if (batch.size() = batchSize) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } if (!batch.isEmpty()) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } }); Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32) at com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158) at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.http.impl.client.DefaultHttpClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-org-apache-http-impl-client-DefaultHttpClient-when-trying-to-send-documentsr-tp21713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr
You need to instantiate the server in the forEachPartition block or Spark will attempt to serialize it to the task. See the design patterns section in the Spark Streaming guide. Jose Fernandez | Principal Software Developer jfernan...@sdl.com | The information transmitted, including attachments, is intended only for the person(s) or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and destroy any copies of this information. Jose Fernandez | Principal Software Developer jfernan...@sdl.com | The information transmitted, including attachments, is intended only for the person(s) or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and destroy any copies of this information. -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, February 18, 2015 1:54 PM To: user@spark.apache.org Subject: NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get the NotSerializableException on the DefaultHttpClient. Is there a possible fix or workaround? I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0. final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ... JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow, SolrInputDocument() { public SolrInputDocument call(Row r) { return r.toSolrDocument(); } }); solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument() { public void call(IteratorSolrInputDocument solrDocIterator) throws Exception { ListSolrInputDocument batch = new ArrayListSolrInputDocument(); while (solrDocIterator.hasNext()) { SolrInputDocument inputDoc = solrDocIterator.next(); batch.add(inputDoc); if (batch.size() = batchSize) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } if (!batch.isEmpty()) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } }); Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32) at com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158) at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.http.impl.client.DefaultHttpClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177
Re: NotSerializableException in Spark Streaming
This still seems to be broken. In 1.1.1, it errors immediately on this line (from the above repro script): liveTweets.map(t = noop(t)).print() The stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC.init(console:34) at $iwC.init(console:36) at init(console:38) at .init(console:42) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Regards, Vasu C On Thu, Nov 6, 2014 at 1:14 PM, Sean Owen so...@cloudera.com wrote: You didn't say what isn't serializable or where the exception occurs, but, is it the same as this issue? https://issues.apache.org/jira/browse/SPARK-4196 On Thu, Nov 6, 2014 at 5:42 AM, Vasu C vasuc.bigd...@gmail.com wrote: Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
HI Sean, Below is my java code and using spark 1.1.0. Still getting the same error. Here Bean class is serialized. Not sure where exactly is the problem. What am I doing wrong here ? public class StreamingJson { public static void main(String[] args) throws Exception { final String HDFS_FILE_LOC = args[0]; final String IMPALA_TABLE_LOC = args[1]; final String TEMP_TABLE_NAME = args[2]; final String HDFS_CHECKPOINT_DIR = args[3]; JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName( test).set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(500)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); } } Regards, Vasu C On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen so...@cloudera.com wrote: No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
Erm, you are trying to do all the work in the create() method. This is definitely not what you want to do. It is just supposed to make the JavaSparkStreamingContext. A further problem is that you're using anonymous inner classes, which are non-static and contain a reference to the outer class. The closure cleaner can sometimes get rid of that, but perhaps not here. Consider a static inner class if you can't resolve it other ways. There is probably however at least another issue in this code ... On Thu, Nov 6, 2014 at 1:43 PM, Vasu C vasuc.bigd...@gmail.com wrote: HI Sean, Below is my java code and using spark 1.1.0. Still getting the same error. Here Bean class is serialized. Not sure where exactly is the problem. What am I doing wrong here ? public class StreamingJson { public static void main(String[] args) throws Exception { final String HDFS_FILE_LOC = args[0]; final String IMPALA_TABLE_LOC = args[1]; final String TEMP_TABLE_NAME = args[2]; final String HDFS_CHECKPOINT_DIR = args[3]; JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName( test).set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(500)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); } } Regards, Vasu C On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen so...@cloudera.com wrote: No, not the same thing then. This just means you accidentally have a reference to the unserializable enclosing test class in your code. Just make sure the reference is severed. On Thu, Nov 6, 2014 at 8:00 AM, Vasu C vasuc.bigd...@gmail.com wrote: Thanks for pointing to the issue. Yes I think its the same issue, below is Exception ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JavaStreamingContextFactory checkpoint directory NotSerializableException
Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); Regards, Vasu
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
You didn't say what isn't serializable or where the exception occurs, but, is it the same as this issue? https://issues.apache.org/jira/browse/SPARK-4196 On Thu, Nov 6, 2014 at 5:42 AM, Vasu C vasuc.bigd...@gmail.com wrote: Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); Regards, Vasu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NotSerializableException: org.apache.spark.sql.hive.api.java.JavaHiveContext
Hello All, I am trying to query a Hive table using Spark SQL from my java code,but getting the following error: *Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.hive.api.java.JavaHiveContextat org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)* I am using Spark 1.0.2. My code snippet is as below: *JavaHiveContext hiveContext = null;JavaSparkContext jsCtx = ..;hiveContext = new JavaHiveContext(jsCtx);hiveContext.hql(select col1,col2 from table1)* Usually people have been suggesting not to pass any non-serializable object to Spark closure function (map,reduce,etc.) to avoid it from getting distributed across multiple machines.But I am not using any closure functions here,so not sure how to handle this issue. Can you please advise how to resolve this problem? Thanks Bijoy
NotSerializableException while doing rdd.saveToCassandra
Hi All, I am using spark-1.0.0 to parse a json file and save to values to cassandra using case class. My code looks as follows: case class LogLine(x1:Option[String],x2: Option[String],x3:Option[List[String]],x4: Option[String],x5:Option[String],x6:Option[String],x7:Option[String],x8:Option[String],x9:Option[String]) val data = test.map(line = { parse(line) }).map(json = { // Extract the values implicit lazy val formats = org.json4s.DefaultFormats val x1 = (json \ x1).extractOpt[String] val x2 = (json \ x2).extractOpt[String] val x4=(json \ x4).extractOpt[String] val x5=(json \ x5).extractOpt[String] val x6=(json \ x6).extractOpt[String] val x7=(json \ x7).extractOpt[String] val x8=(json \ x8).extractOpt[String] val x3=(json \ x3).extractOpt[List[String]] val x9=(json \ x9).extractOpt[String] LogLine(x1,x2,x3,x4,x5,x6,x7,x8,x9) }) data.saveToCassandra(test, test_data, Seq(x1, x2, x3, x4, x5, x6, x7, x8, x9)) whereas the cassandra table schema is as follows: CREATE TABLE test_data ( x1 varchar, x2 varchar, x4 varchar, x5 varchar, x6 varchar, x7 varchar, x8 varchar, x3 listtext , x9 varchar, PRIMARY KEY (x1)); I am getting the following error on executing the saveToCassandra statement: 14/08/27 11:33:59 INFO SparkContext: Starting job: runJob at package.scala:169 14/08/27 11:33:59 INFO DAGScheduler: Got job 5 (runJob at package.scala:169) with 1 output partitions (allowLocal=false) 14/08/27 11:33:59 INFO DAGScheduler: Final stage: Stage 5(runJob at package.scala:169) 14/08/27 11:33:59 INFO DAGScheduler: Parents of final stage: List() 14/08/27 11:33:59 INFO DAGScheduler: Missing parents: List() 14/08/27 11:33:59 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[7] at map at console:45), which has no missing parents 14/08/27 11:33:59 INFO DAGScheduler: Failed to run runJob at package.scala:169 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkConf data.saveToCassandra(test, test_data, Seq(x1, x2, x3, x4, x5, x6, x7, x8, x9)) Here the data field is org.apache.spark.rdd.RDD[LogLine] = MappedRDD[7] at map at console:45 How can I convert this to Serializable, or is this a different problem? Please advise. Regards, lmk -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-while-doing-rdd-saveToCassandra-tp12906.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
iterator cause NotSerializableException
Hi The following code gives me 'Task not serializable: java.io.NotSerializableException: scala.collection.mutable.ArrayOps$ofInt' var x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) var iter = Array(5).toIterator var value = 5 var value2 = iter.next x.map( q = q*value).collect //Line 1, it works. x.map( q= q*value2).collect //Line 2, error 'value' and 'value2' look like exactly same, but why does this happen? The iterator from RDD.toLocalIterator cause this too. I tested it in spark-shell on Spark 1.0.2. Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iterator-cause-NotSerializableException-tp12638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got NotSerializableException when access broadcast variable
Thanks for help On Aug 21, 2014, at 10:56, Yin Huai huaiyin@gmail.com wrote: If you want to filter the table name, you can use hc.sql(show tables).filter(row = !test.equals(row.getString(0 Seems making functionRegistry transient can fix the error. On Wed, Aug 20, 2014 at 8:53 PM, Vida Ha v...@databricks.com wrote: Hi, I doubt the the broadcast variable is your problem, since you are seeing: org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 We have a knowledgebase article that explains why this happens - it's a very common error I see users triggering on the mailing list: https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Are you using the HiveContext within a tranformation that is called on an RDD? That will definitely create a problem. -Vida On Wed, Aug 20, 2014 at 1:20 AM, tianyi tia...@asiainfo.com wrote: Thanks for help. I run this script again with bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer” in the console, I can see: scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-eaabe986-03cb-41bd-bde5-993c7db3f048) (spark.driver.host,10.1.51.127) (spark.executor.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.repl.class.uri,http://10.1.51.127:51319) (spark.app.name,Spark shell) (spark.driver.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.fileserver.uri,http://10.1.51.127:51322) (spark.jars,) (spark.driver.port,51320) (spark.master,local[*]) But it fails again with the same error. On Aug 20, 2014, at 15:59, Fengyun RAO raofeng...@gmail.com wrote: try: sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) 2014-08-20 14:27 GMT+08:00 田毅 tia...@asiainfo.com: Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) at
Got NotSerializableException when access broadcast variable
Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) I write some simple script to reproduce this problem. case 1 : val barr1 = sc.broadcast(test) val sret = sc.parallelize(1 to 10, 2) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) It’s working fine with local mode and yarn-client mode. case 2 : val barr1 = sc.broadcast(test) val hc = new org.apache.spark.sql.hive.HiveContext(sc) val sret = hc.sql(show tables) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) It will throw java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext with local mode and yarn-client mode But it working fine if I write the same code in a scala file and run in Intellij IDEA. import org.apache.spark.{SparkConf, SparkContext} object TestBroadcast2 { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Broadcast Test).setMaster(local[3]) val sc = new SparkContext(sparkConf) val barr1 = sc.broadcast(test) val hc = new org.apache.spark.sql.hive.HiveContext(sc) val sret = hc.sql(show tables) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) } }
Re: Got NotSerializableException when access broadcast variable
Thanks for help. I run this script again with bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer” in the console, I can see: scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-eaabe986-03cb-41bd-bde5-993c7db3f048) (spark.driver.host,10.1.51.127) (spark.executor.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.repl.class.uri,http://10.1.51.127:51319) (spark.app.name,Spark shell) (spark.driver.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.fileserver.uri,http://10.1.51.127:51322) (spark.jars,) (spark.driver.port,51320) (spark.master,local[*]) But it fails again with the same error. On Aug 20, 2014, at 15:59, Fengyun RAO raofeng...@gmail.com wrote: try: sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) 2014-08-20 14:27 GMT+08:00 田毅 tia...@asiainfo.com: Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) I write some simple script to reproduce this problem. case 1 : val barr1 = sc.broadcast(test) val sret = sc.parallelize(1 to 10, 2) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) It’s working fine with local mode and yarn-client mode. case 2 : val barr1 = sc.broadcast(test) val hc = new org.apache.spark.sql.hive.HiveContext(sc) val sret = hc.sql(show tables) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) It will throw java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext with local mode and yarn-client mode But it working fine if I write the same code in a scala file and run in Intellij IDEA. import org.apache.spark.{SparkConf, SparkContext} object TestBroadcast2 { def main(args: Array[String]) { val sparkConf = new
Re: Got NotSerializableException when access broadcast variable
Hi, I doubt the the broadcast variable is your problem, since you are seeing: org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.spark.sql .hive.HiveContext$$anon$3 We have a knowledgebase article that explains why this happens - it's a very common error I see users triggering on the mailing list: https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Are you using the HiveContext within a tranformation that is called on an RDD? That will definitely create a problem. -Vida On Wed, Aug 20, 2014 at 1:20 AM, tianyi tia...@asiainfo.com wrote: Thanks for help. I run this script again with bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer” in the console, I can see: scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-eaabe986-03cb-41bd-bde5-993c7db3f048) (spark.driver.host,10.1.51.127) (spark.executor.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.repl.class.uri,http://10.1.51.127:51319) (spark.app.name,Spark shell) (spark.driver.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.fileserver.uri,http://10.1.51.127:51322) (spark.jars,) (spark.driver.port,51320) (spark.master,local[*]) But it fails again with the same error. On Aug 20, 2014, at 15:59, Fengyun RAO raofeng...@gmail.com wrote: try: sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) 2014-08-20 14:27 GMT+08:00 田毅 tia...@asiainfo.com: Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) I write some simple script to reproduce this problem. case 1 : val barr1 = sc.broadcast(test) val sret = sc.parallelize(1 to 10, 2) val ret = sret.filter(row = !barr1.equals(test)) ret.collect.foreach(println) It’s working fine with local mode and yarn-client mode. case 2 : val barr1 = sc.broadcast(test) val hc = new org.apache.spark.sql.hive.HiveContext(sc) val sret = hc.sql(show tables) val ret
Re: Got NotSerializableException when access broadcast variable
If you want to filter the table name, you can use hc.sql(show tables).filter(row = !test.equals(row.getString(0 Seems making functionRegistry transient can fix the error. On Wed, Aug 20, 2014 at 8:53 PM, Vida Ha v...@databricks.com wrote: Hi, I doubt the the broadcast variable is your problem, since you are seeing: org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.spark.sql .hive.HiveContext$$anon$3 We have a knowledgebase article that explains why this happens - it's a very common error I see users triggering on the mailing list: https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Are you using the HiveContext within a tranformation that is called on an RDD? That will definitely create a problem. -Vida On Wed, Aug 20, 2014 at 1:20 AM, tianyi tia...@asiainfo.com wrote: Thanks for help. I run this script again with bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer” in the console, I can see: scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-eaabe986-03cb-41bd-bde5-993c7db3f048) (spark.driver.host,10.1.51.127) (spark.executor.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.repl.class.uri,http://10.1.51.127:51319) (spark.app.name,Spark shell) (spark.driver.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.fileserver.uri,http://10.1.51.127:51322) (spark.jars,) (spark.driver.port,51320) (spark.master,local[*]) But it fails again with the same error. On Aug 20, 2014, at 15:59, Fengyun RAO raofeng...@gmail.com wrote: try: sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) 2014-08-20 14:27 GMT+08:00 田毅 tia...@asiainfo.com: Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) I write some simple script to reproduce this problem. case 1 : val barr1 = sc.broadcast(test) val sret = sc.parallelize(1 to 10, 2) val ret = sret.filter(row =
RE: Got NotSerializableException when access broadcast variable
PR is https://github.com/apache/spark/pull/2074. -- From: Yin Huai huaiyin@gmail.com Sent: 8/20/2014 10:56 PM To: Vida Ha v...@databricks.com Cc: tianyi tia...@asiainfo.com; Fengyun RAO raofeng...@gmail.com; user@spark.apache.org Subject: Re: Got NotSerializableException when access broadcast variable If you want to filter the table name, you can use hc.sql(show tables).filter(row = !test.equals(row.getString(0 Seems making functionRegistry transient can fix the error. On Wed, Aug 20, 2014 at 8:53 PM, Vida Ha v...@databricks.com wrote: Hi, I doubt the the broadcast variable is your problem, since you are seeing: org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.spark.sql .hive.HiveContext$$anon$3 We have a knowledgebase article that explains why this happens - it's a very common error I see users triggering on the mailing list: https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Are you using the HiveContext within a tranformation that is called on an RDD? That will definitely create a problem. -Vida On Wed, Aug 20, 2014 at 1:20 AM, tianyi tia...@asiainfo.com wrote: Thanks for help. I run this script again with bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer” in the console, I can see: scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-eaabe986-03cb-41bd-bde5-993c7db3f048) (spark.driver.host,10.1.51.127) (spark.executor.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.repl.class.uri,http://10.1.51.127:51319) (spark.app.name,Spark shell) (spark.driver.extraJavaOptions,-Dsun.io.serialization.extendedDebugInfo=true) (spark.fileserver.uri,http://10.1.51.127:51322) (spark.jars,) (spark.driver.port,51320) (spark.master,local[*]) But it fails again with the same error. On Aug 20, 2014, at 15:59, Fengyun RAO raofeng...@gmail.com wrote: try: sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) 2014-08-20 14:27 GMT+08:00 田毅 tia...@asiainfo.com: Hi everyone! I got a exception when i run my script with spark-shell: I added SPARK_JAVA_OPTS=-Dsun.io.serialization.extendedDebugInfo=true in spark-env.sh to show the following stack: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.sql.SchemaRDD.filter(SchemaRDD.scala:460) at $iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC.init(console:25) at $iwC.init(console:27) at init(console:29) at .init(console:33) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) …… Caused by: java.io.NotSerializableException: org.apache.spark.sql.hive.HiveContext$$anon$3 - field (class org.apache.spark.sql.hive.HiveContext, name: functionRegistry, type: class org.apache.spark.sql.hive.HiveFunctionRegistry) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4648e685) - field (class $iwC$$iwC$$iwC$$iwC, name: hc, type: class org.apache.spark.sql.hive.HiveContext) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@23d652ef) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@71cc14f1) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@74eca89e) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@685c4cc4) - field (class $line9.$read, name: $iw, type: class $iwC) - object (class $line9.$read, $line9.$read@519f9aae) - field (class $iwC$$iwC$$iwC, name: $VAL7, type: class $line9.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@4b996858) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@31d646d4) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - root object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1
NotSerializableException
Hi, I took avro 1.7.7 and recompiled my distribution to be able to fix the issue when dealing with avro GenericRecord. The issue I got was resolved. I'm referring to AVRO-1476. I also enabled kryo registration in SparkConf. That said, I am still seeing a NotSerializableException for Schema$RecordSchema. Do I need to do anything else? Thanks, Ron Sent from my iPad
NotSerializableException exception while using TypeTag in Scala 2.10
I am trying to serialize objects contained in RDDs using runtime relfection via TypeTag. However, the Spark job keeps failing java.io.NotSerializableException on an instance of TypeCreator (auto generated by compiler to enable TypeTags). Is there any workaround for this without switching to scala 2.11?
Re: NotSerializableException in Spark Streaming
Yep, here goes! Here are my environment vitals: - Spark 1.0.0 - EC2 cluster with 1 slave spun up using spark-ec2 - twitter4j 3.0.3 - spark-shell called with --jars argument to load spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j jars. Now, while I’m in the Spark shell, I enter the following: import twitter4j.auth.{Authorization, OAuthAuthorization} import twitter4j.conf.ConfigurationBuilder import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} import org.apache.spark.streaming.twitter.TwitterUtils def getAuth(): Option[Authorization] = { System.setProperty(twitter4j.oauth.consumerKey, consumerKey) System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret) System.setProperty(twitter4j.oauth.accessToken, accessToken) System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret) Some(new OAuthAuthorization(new ConfigurationBuilder().build())) } def noop(a: Any): Any = { a } val ssc = new StreamingContext(sc, Seconds(5)) val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth()) val liveTweets = liveTweetObjects.map(_.getText) liveTweets.map(t = noop(t)).print() ssc.start() So basically, I’m just printing Tweets as-is, but first I’m mapping them to themselves via noop(). The Tweets will start to flow just fine for a minute or so, and then, this: 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job 140624361 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The time-to-first-error is variable. This is the simplest repro I can show at this time. Doing more complex things with liveTweets that involve a KMeansModel, for example, will be interrupted quicker by this java.io.NotSerializableException. I don’t know if the root cause is the same, but the error certainly is. By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error, but I can’t dig deeper to make sure this is really resolved (e.g. by trying more complex things that need data) due to SPARK-2471 https://issues.apache.org/jira/browse/SPARK-2471. I see that that issue has been resolved, so I’ll try this whole process again using the latest from master and see how it goes. Nick On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am very curious though. Can you post a concise code example which we can run to reproduce this problem? TD On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am not entire sure off the top of my head. But a possible (usually works) workaround is to define the function as a val instead of a def. For example def func(i: Int): Boolean = { true } can be written as val func = (i: Int) = { true } Hope this helps for now. TD On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m
Re: NotSerializableException in Spark Streaming
Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m calling is a KMeans model.predict(). In regular Spark it works, and Spark Streaming without the call to model.predict() also works, but when put together I get this serialization exception. I’m on 1.0.0. Nick On Thu, May 8, 2014 at 6:37 AM, Diana Carroll dcarr...@cloudera.com wrote: Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana
Re: NotSerializableException in Spark Streaming
I am very curious though. Can you post a concise code example which we can run to reproduce this problem? TD On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am not entire sure off the top of my head. But a possible (usually works) workaround is to define the function as a val instead of a def. For example def func(i: Int): Boolean = { true } can be written as val func = (i: Int) = { true } Hope this helps for now. TD On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey Diana, Did you ever figure this out? I’m running into the same exception, except in my case the function I’m calling is a KMeans model.predict(). In regular Spark it works, and Spark Streaming without the call to model.predict() also works, but when put together I get this serialization exception. I’m on 1.0.0. Nick On Thu, May 8, 2014 at 6:37 AM, Diana Carroll dcarr...@cloudera.com wrote: Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana
Spark and Cassandra - NotSerializableException
Hi, I am writing a standalone Spark program that gets its data from Cassandra. I followed the examples and created the RDD via the newAPIHadoopRDD() and the ColumnFamilyInputFormat class. The RDD is created, but I get a NotSerializableException when I call the RDD's .groupByKey() method: public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(local).setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Job job = new Job(); Configuration jobConf = job.getConfiguration(); job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setInputInitialAddress(jobConf, host); ConfigHelper.setInputRpcPort(jobConf, port); ConfigHelper.setOutputInitialAddress(jobConf, host); ConfigHelper.setOutputRpcPort(jobConf, port); ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true); ConfigHelper.setInputPartitioner(jobConf,Murmur3Partitioner); ConfigHelper.setOutputPartitioner(jobConf,Murmur3Partitioner); SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setFinish(new byte[0]); sliceRange.setStart(new byte[0]); predicate.setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(jobConf, predicate); JavaPairRDDByteBuffer, SortedMaplt;ByteBuffer, IColumn rdd = spark.newAPIHadoopRDD(jobConf, ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), ByteBuffer.class, SortedMap.class); JavaPairRDDByteBuffer, Iterablelt;SortedMaplt;ByteBuffer, IColumn groupRdd = rdd.groupByKey(); System.out.println(groupRdd.count()); } The exception: java.io.NotSerializableException: java.nio.HeapByteBuffer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) What I am trying to do is to merge all row key columns into a single entry. I also get the same exception when I try to use the reduceByKey() method like so: JavaPairRDDByteBuffer, SortedMaplt;ByteBuffer, IColumn reducedRdd = rdd.reduceByKey( new Function2SortedMaplt;ByteBuffer, IColumn, SortedMapByteBuffer, IColumn, SortedMapByteBuffer, IColumn() { public SortedMapByteBuffer, IColumn call(SortedMapByteBuffer, IColumn arg0, SortedMapByteBuffer, IColumn arg1) throws Exception { SortedMapByteBuffer, IColumn sortedMap = new TreeMapByteBuffer, IColumn(arg0.comparator()); sortedMap.putAll(arg0); sortedMap.putAll(arg1); return sortedMap; } } ); I am using: 1. spark-1.0.0-bin-hadoop1 2. Cassandra 1.2.12 3. Java 1.6 Do anyone know what the problem is? What is there that fails serialization? Thanks, Shai -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Cassandra-NotSerializableException-tp8260.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
KMeans.train() throws NotSerializableException
when i called KMeans.train(), an error happened: 14/06/04 13:02:29 INFO scheduler.DAGScheduler: Submitting Stage 3 (MappedRDD[12] at map at KMeans.scala:123), which has no missing parents 14/06/04 13:02:29 INFO scheduler.DAGScheduler: Failed to run takeSample at KMeans.scala:260 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.col lection.convert.Wrappers$MapWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) my codes look like: JavaRDDVector docVectors = ...; int numClusters = 20; int numIterations = 20; KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters, numIterations); version: 1.0.0 2014-06-04 bluejoe2008
NotSerializableException in Spark Streaming
Hey all, trying to set up a pretty simple streaming app and getting some weird behavior. First, a non-streaming job that works fine: I'm trying to pull out lines of a log file that match a regex, for which I've set up a function: def getRequestDoc(s: String): String = { KBDOC-[0-9]*.r.findFirstIn(s).orNull } logs=sc.textFile(logfiles) logs.map(getRequestDoc).take(10) That works, but I want to run that on the same data, but streaming, so I tried this: val logs = ssc.socketTextStream(localhost,) logs.map(getRequestDoc).print() ssc.start() From this code, I get: 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job 1399545128000 ms.0 org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext But if I do the map function inline instead of calling a separate function, it works: logs.map(KBDOC-[0-9]*.r.findFirstIn(_).orNull).print() So why is it able to serialize my little function in regular spark, but not in streaming? Thanks, Diana