[ https://issues.apache.org/jira/browse/SPARK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Wendell reassigned SPARK-4196: -------------------------------------- Assignee: Patrick Wendell > Streaming + checkpointing + saveAsNewAPIHadoopFiles = > NotSerializableException for Hadoop Configuration > ------------------------------------------------------------------------------------------------------- > > Key: SPARK-4196 > URL: https://issues.apache.org/jira/browse/SPARK-4196 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: Sean Owen > Assignee: Patrick Wendell > > I am reasonably sure there is some issue here in Streaming and that I'm not > missing something basic, but not 100%. I went ahead and posted it as a JIRA > to track, since it's come up a few times before without resolution, and right > now I can't get checkpointing to work at all. > When Spark Streaming checkpointing is enabled, I see a > NotSerializableException thrown for a Hadoop Configuration object, and it > seems like it is not one from my user code. > Before I post my particular instance see > http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3c1408135046777-12202.p...@n3.nabble.com%3E > for another occurrence. > I was also on customer site last week debugging an identical issue with > checkpointing in a Scala-based program and they also could not enable > checkpointing without hitting exactly this error. > The essence of my code is: > {code} > final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); > JavaStreamingContextFactory streamingContextFactory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > return new JavaStreamingContext(sparkContext, new > Duration(batchDurationMS)); > } > }; > streamingContext = JavaStreamingContext.getOrCreate( > checkpointDirString, sparkContext.hadoopConfiguration(), > streamingContextFactory, false); > streamingContext.checkpoint(checkpointDirString); > {code} > It yields: > {code} > 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66 > org.apache.hadoop.conf.Configuration > - field (class > "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", > name: "conf$2", type: "class org.apache.hadoop.conf.Configuration") > - object (class > "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9", > <function2>) > - field (class "org.apache.spark.streaming.dstream.ForEachDStream", > name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", > type: "interface scala.Function2") > - object (class "org.apache.spark.streaming.dstream.ForEachDStream", > org.apache.spark.streaming.dstream.ForEachDStream@cb8016a) > ... > {code} > This looks like it's due to PairRDDFunctions, as this saveFunc seems > to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9 > : > {code} > def saveAsNewAPIHadoopFiles( > prefix: String, > suffix: String, > keyClass: Class[_], > valueClass: Class[_], > outputFormatClass: Class[_ <: NewOutputFormat[_, _]], > conf: Configuration = new Configuration > ) { > val saveFunc = (rdd: RDD[(K, V)], time: Time) => { > val file = rddToFileName(prefix, suffix, time) > rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, > outputFormatClass, conf) > } > self.foreachRDD(saveFunc) > } > {code} > Is that not a problem? but then I don't know how it would ever work in Spark. > But then again I don't see why this is an issue and only when checkpointing > is enabled. > Long-shot, but I wonder if it is related to closure issues like > https://issues.apache.org/jira/browse/SPARK-1866 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org