Hi All,

Would really appreciate if someone in the community can help me with this.
I have a simple Java spark streaming application - NetworkWordCount

                SparkConf sparkConf = new
SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 JavaReceiverInputDStream<String> lines = jssc.socketTextStream(hostname,
port);
JavaDStream<String> words = lines.flatMap(new SplitLines());
JavaPairDStream<String, Integer> wordMap = words.mapToPair(new MapWords());
JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
ReduceWords());
wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", Text.class,
IntWritable.class, TextOutputFormat.class, clstrConf);
jssc.start();
jssc.awaitTermination();


I have an issue with this line

wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", Text.class,
IntWritable.class, TextOutputFormat.class, clstrConf);

it complains of the following -

The method saveAsNewAPIHadoopFiles(String, String, Class<?>, Class<?>,
Class<? extends OutputFormat<?,?>>, Configuration) in the type
JavaPairDStream<String,Integer> is not applicable for the arguments
(String, String, Class<Text>, Class<IntWritable>, Class<TextOutputFormat>,
Configuration)


In the saveAsNewAPIHadoopFile for JavaPairRDD this worked perfectly fine...


Would appreciate if someone could help me with this...?

-- 
abe

Reply via email to