Hi, 
We are designing a solution which pulls file paths from Kafka and for the
current stage just counts the lines in each of these files. 
When running the code it fails on: 
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable 
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) 
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) 
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
 
        at
org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
 
        at streamReader.App.main(App.java:66) 

Is using the sparkContext from inside a map function wrong ? 

This is the code we are using: 
        SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("spark://namenode:7077"); 

            // KAFKA 
            final JavaStreamingContext jssc = new JavaStreamingContext(conf,
new Duration(2000)); 
            Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
            topicMap.put("uploadedFiles", 1); 
            JavaPairReceiverInputDStream<String, String> messages = 
            KafkaUtils.createStream(jssc, "localhost:2181", "group3",
topicMap); 
    
            
            JavaDStream<String> files = messages.map(new
Function<Tuple2&lt;String, String>, String>() { 
                
                public String call(Tuple2<String, String> tuple2) { 
                  return tuple2._2(); 
                } 
              }); 

            
            JavaPairDStream<String, Integer> pairs = messages.mapToPair( 
            new PairFunction<Tuple2&lt;String, String>, String, Integer>() 
            { 
                         public Tuple2<String, Integer> call(Tuple2<String,
String> word) throws Exception 
                         { 
                        JavaRDD<String> textfile =
jssc.sparkContext().textFile(word._2()); 
                        int test = new Long(textfile.count()).intValue(); 
                                return new Tuple2<String,
Integer>(word._2(), test); 
                         } 
            }); 
                        
            
            System.out.println("Printing Messages:"); 
            pairs.print(); 
            
            jssc.start(); 
            jssc.awaitTermination(); 
       jssc.close(); 

Thanks, 
Daniel



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkContext-in-inside-a-map-function-tp21961.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to