Repository: spark Updated Branches: refs/heads/master c06e42f2c -> a65766bf0
[SPARK-5826][Streaming] Fix Configuration not serializable problem Author: jerryshao <saisai.s...@intel.com> Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits: 7ec71db [jerryshao] Remove transient for conf statement 88d84e6 [jerryshao] Fix Configuration not serializable problem Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a65766bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a65766bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a65766bf Branch: refs/heads/master Commit: a65766bf0244a41b793b9dc5fbdd2882664ad00e Parents: c06e42f Author: jerryshao <saisai.s...@intel.com> Authored: Tue Feb 17 10:45:18 2015 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Feb 17 10:45:18 2015 +0000 ---------------------------------------------------------------------- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a65766bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 6379b88..4f7db41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag @@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { + private val serializableConfOpt = conf.map(new SerializableWritable(_)) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = conf match { + val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org