Repository: spark
Updated Branches:
  refs/heads/master c06e42f2c -> a65766bf0


[SPARK-5826][Streaming] Fix Configuration not serializable problem

Author: jerryshao <saisai.s...@intel.com>

Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits:

7ec71db [jerryshao] Remove transient for conf statement
88d84e6 [jerryshao] Fix Configuration not serializable problem


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a65766bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a65766bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a65766bf

Branch: refs/heads/master
Commit: a65766bf0244a41b793b9dc5fbdd2882664ad00e
Parents: c06e42f
Author: jerryshao <saisai.s...@intel.com>
Authored: Tue Feb 17 10:45:18 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 17 10:45:18 2015 +0000

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a65766bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6379b88..4f7db41 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
+import org.apache.spark.SerializableWritable
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.util.{TimeStampedHashMap, Utils}
@@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
     (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
+  private val serializableConfOpt = conf.map(new SerializableWritable(_))
+
   // This is a def so that it works during checkpoint recovery:
   private def clock = ssc.scheduler.clock
 
@@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
     val fileRDDs = files.map(file =>{
-      val rdd = conf match {
+      val rdd = serializableConfOpt.map(_.value) match {
         case Some(config) => context.sparkContext.newAPIHadoopFile(
           file,
           fm.runtimeClass.asInstanceOf[Class[F]],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to