Yes, you can never use the SparkContext inside a remote function. It is on the driver only.
On Sun, Mar 8, 2015 at 4:22 PM, Daniel Haviv <daniel.ha...@veracity-group.com> wrote: > Hi, > We are designing a solution which pulls file paths from Kafka and for the > current stage just counts the lines in each of these files. > When running the code it fails on: > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) > at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146) > at > org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46) > at streamReader.App.main(App.java:66) > > Is using the sparkContext from inside a map function wrong ? > > This is the code we are using: > SparkConf conf = new SparkConf().setAppName("Simple > Application").setMaster("spark://namenode:7077"); > > // KAFKA > final JavaStreamingContext jssc = new JavaStreamingContext(conf, > new Duration(2000)); > Map<String, Integer> topicMap = new HashMap<String, Integer>(); > topicMap.put("uploadedFiles", 1); > JavaPairReceiverInputDStream<String, String> messages = > KafkaUtils.createStream(jssc, "localhost:2181", "group3", > topicMap); > > > JavaDStream<String> files = messages.map(new > Function<Tuple2<String, String>, String>() { > > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > }); > > > JavaPairDStream<String, Integer> pairs = messages.mapToPair( > new PairFunction<Tuple2<String, String>, String, Integer>() > { > public Tuple2<String, Integer> call(Tuple2<String, > String> word) throws Exception > { > JavaRDD<String> textfile = > jssc.sparkContext().textFile(word._2()); > int test = new Long(textfile.count()).intValue(); > return new Tuple2<String, > Integer>(word._2(), test); > } > }); > > > System.out.println("Printing Messages:"); > pairs.print(); > > jssc.start(); > jssc.awaitTermination(); > jssc.close(); > > Thanks, > Daniel > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org