Okay, thanks Akhil! Suhas Shekar
University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Mon, Jan 12, 2015 at 1:24 PM, Akhil Das <[email protected]> wrote: > There is no direct way of doing that. If you need a Single file for every > batch duration, then you can repartition the data to 1 before saving. > Another way would be to use hadoop's copy merge command/api(available from > 2.0 versions) > On 13 Jan 2015 01:08, "Su She" <[email protected]> wrote: > >> Hello Everyone, >> >> Quick followup, is there any way I can append output to one file rather >> then create a new directory/file every X milliseconds? >> >> Thanks! >> >> Suhas Shekar >> >> University of California, Los Angeles >> B.A. Economics, Specialization in Computing 2014 >> >> On Thu, Jan 8, 2015 at 11:41 PM, Su She <[email protected]> wrote: >> >>> 1) Thank you everyone for the help once again...the support here is >>> really amazing and I hope to contribute soon! >>> >>> 2) The solution I actually ended up using was from this thread: >>> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E >>> >>> in case the thread ever goes down, the soln provided by Matei: >>> >>> >>> plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv", >>> String.class, String.class, (Class) TextOutputFormat.class); >>> >>> I had browsed a lot of similar threads that did not have answers, but >>> found this one from quite some time ago, so apologize for posting a >>> question that had been answered before. >>> >>> 3) Akhil, I was specifying the format as "txt", but it was not >>> compatible >>> >>> Thanks for the help! >>> >>> >>> On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das <[email protected]> >>> wrote: >>> >>>> saveAsHadoopFiles requires you to specify the output format which i >>>> believe you are not specifying anywhere and hence the program crashes. >>>> >>>> You could try something like this: >>>> >>>> Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends >>>> OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; >>>> 46 >>>> >>>> yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, >>>> "/output-location",Text.class, Text.class, outputFormatClass); >>>> >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Fri, Jan 9, 2015 at 10:22 AM, Su She <[email protected]> wrote: >>>> >>>>> Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when >>>>> I call print on the Dstream it works? If I had to do foreachRDD to >>>>> saveAsHadoopFile, then why is it working for print? >>>>> >>>>> Also, if I am doing foreachRDD, do I need connections, or can I simply >>>>> put the saveAsHadoopFiles inside the foreachRDD function? >>>>> >>>>> Thanks Yana for the help! I will play around with foreachRDD and >>>>> convey my results. >>>>> >>>>> >>>>> >>>>> On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska <[email protected] >>>>> > wrote: >>>>> >>>>>> are you calling the saveAsText files on the DStream --looks like it? >>>>>> Look at the section called "Design Patterns for using foreachRDD" in the >>>>>> link you sent -- you want to do dstream.foreachRDD(rdd => >>>>>> rdd.saveAs....) >>>>>> >>>>>> On Thu, Jan 8, 2015 at 5:20 PM, Su She <[email protected]> wrote: >>>>>> >>>>>>> Hello Everyone, >>>>>>> >>>>>>> Thanks in advance for the help! >>>>>>> >>>>>>> I successfully got my Kafka/Spark WordCount app to print locally. >>>>>>> However, I want to run it on a cluster, which means that I will have to >>>>>>> save it to HDFS if I want to be able to read the output. >>>>>>> >>>>>>> I am running Spark 1.1.0, which means according to this document: >>>>>>> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html >>>>>>> >>>>>>> I should be able to use commands such as saveAsText/HadoopFiles. >>>>>>> >>>>>>> 1) When I try saveAsTextFiles it says: >>>>>>> cannot find symbol >>>>>>> [ERROR] symbol : method >>>>>>> saveAsTextFiles(java.lang.String,java.lang.String) >>>>>>> [ERROR] location: class >>>>>>> org.apache.spark.streaming.api.java.JavaPairDStream<java.lang.String,java.lang.Integer> >>>>>>> >>>>>>> This makes some sense as saveAsTextFiles is not included here: >>>>>>> >>>>>>> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html >>>>>>> >>>>>>> 2) When I try >>>>>>> saveAsHadoopFiles("hdfs://ip....us-west-1.compute.internal:8020/user/testwordcount", >>>>>>> "txt") it builds, but when I try running it it throws this exception: >>>>>>> >>>>>>> Exception in thread "main" java.lang.RuntimeException: >>>>>>> java.lang.RuntimeException: class scala.runtime.Nothing$ not >>>>>>> org.apache.hadoop.mapred.OutputFormat >>>>>>> at >>>>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079) >>>>>>> at >>>>>>> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712) >>>>>>> at >>>>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021) >>>>>>> at >>>>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>> at scala.util.Try$.apply(Try.scala:161) >>>>>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >>>>>>> at >>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>> at java.lang.Thread.run(Thread.java:724) >>>>>>> Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ >>>>>>> not org.apache.hadoop.mapred.OutputFormat >>>>>>> at >>>>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073) >>>>>>> ... 14 more >>>>>>> >>>>>>> >>>>>>> Any help is really appreciated! Thanks. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>
