Hi All,
Would really appreciate if someone in the community can help me with this. I have a simple Java spark streaming application - NetworkWordCount SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); JavaReceiverInputDStream<String> lines = jssc.socketTextStream(hostname, port); JavaDStream<String> words = lines.flatMap(new SplitLines()); JavaPairDStream<String, Integer> wordMap = words.mapToPair(new MapWords()); JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", Text.class, IntWritable.class, TextOutputFormat.class, clstrConf); jssc.start(); jssc.awaitTermination(); I have an issue with this line wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", Text.class, IntWritable.class, TextOutputFormat.class, clstrConf); it complains of the following - The method saveAsNewAPIHadoopFiles(String, String, Class<?>, Class<?>, Class<? extends OutputFormat<?,?>>, Configuration) in the type JavaPairDStream<String,Integer> is not applicable for the arguments (String, String, Class<Text>, Class<IntWritable>, Class<TextOutputFormat>, Configuration) In the saveAsNewAPIHadoopFile for JavaPairRDD this worked perfectly fine... Would appreciate if someone could help me with this...? -- abe