saveAsHadoopFiles requires you to specify the output format which i believe you are not specifying anywhere and hence the program crashes.
You could try something like this: Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; 46 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, "/output-location",Text.class, Text.class, outputFormatClass); Thanks Best Regards On Fri, Jan 9, 2015 at 10:22 AM, Su She <suhsheka...@gmail.com> wrote: > Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I > call print on the Dstream it works? If I had to do foreachRDD to > saveAsHadoopFile, then why is it working for print? > > Also, if I am doing foreachRDD, do I need connections, or can I simply put > the saveAsHadoopFiles inside the foreachRDD function? > > Thanks Yana for the help! I will play around with foreachRDD and convey my > results. > > Suhas Shekar > > University of California, Los Angeles > B.A. Economics, Specialization in Computing 2014 > > On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska <yana.kadiy...@gmail.com> > wrote: > >> are you calling the saveAsText files on the DStream --looks like it? Look >> at the section called "Design Patterns for using foreachRDD" in the link >> you sent -- you want to do dstream.foreachRDD(rdd => rdd.saveAs....) >> >> On Thu, Jan 8, 2015 at 5:20 PM, Su She <suhsheka...@gmail.com> wrote: >> >>> Hello Everyone, >>> >>> Thanks in advance for the help! >>> >>> I successfully got my Kafka/Spark WordCount app to print locally. >>> However, I want to run it on a cluster, which means that I will have to >>> save it to HDFS if I want to be able to read the output. >>> >>> I am running Spark 1.1.0, which means according to this document: >>> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html >>> >>> I should be able to use commands such as saveAsText/HadoopFiles. >>> >>> 1) When I try saveAsTextFiles it says: >>> cannot find symbol >>> [ERROR] symbol : method >>> saveAsTextFiles(java.lang.String,java.lang.String) >>> [ERROR] location: class >>> org.apache.spark.streaming.api.java.JavaPairDStream<java.lang.String,java.lang.Integer> >>> >>> This makes some sense as saveAsTextFiles is not included here: >>> >>> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html >>> >>> 2) When I try >>> saveAsHadoopFiles("hdfs://ip....us-west-1.compute.internal:8020/user/testwordcount", >>> "txt") it builds, but when I try running it it throws this exception: >>> >>> Exception in thread "main" java.lang.RuntimeException: >>> java.lang.RuntimeException: class scala.runtime.Nothing$ not >>> org.apache.hadoop.mapred.OutputFormat >>> at >>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) >>> at >>> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) >>> at >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) >>> at >>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) >>> at >>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>> at scala.util.Try$.apply(Try.scala:161) >>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:724) >>> Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ not >>> org.apache.hadoop.mapred.OutputFormat >>> at >>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073) >>> ... 14 more >>> >>> >>> Any help is really appreciated! Thanks. >>> >>> >> >