Okay, thanks Akhil!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Jan 12, 2015 at 1:24 PM, Akhil Das <[email protected]>
wrote:

> There is no direct way of doing that. If you need a Single file for every
> batch duration, then you can repartition the data to 1 before saving.
> Another way would be to use hadoop's copy merge command/api(available from
> 2.0 versions)
> On 13 Jan 2015 01:08, "Su She" <[email protected]> wrote:
>
>> Hello Everyone,
>>
>> Quick followup, is there any way I can append output to one file rather
>> then create a new directory/file every X milliseconds?
>>
>> Thanks!
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Thu, Jan 8, 2015 at 11:41 PM, Su She <[email protected]> wrote:
>>
>>> 1) Thank you everyone for the help once again...the support here is
>>> really amazing and I hope to contribute soon!
>>>
>>> 2) The solution I actually ended up using was from this thread:
>>> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E
>>>
>>> in case the thread ever goes down, the soln provided by Matei:
>>>
>>>
>>> plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv",
>>> String.class, String.class, (Class) TextOutputFormat.class);
>>>
>>> I had browsed a lot of similar threads that did not have answers, but
>>> found this one from quite some time ago, so apologize for posting a
>>> question that had been answered before.
>>>
>>> 3) Akhil, I was specifying the format as "txt", but it was not
>>> compatible
>>>
>>> Thanks for the help!
>>>
>>>
>>> On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das <[email protected]>
>>> wrote:
>>>
>>>> saveAsHadoopFiles requires you to specify the output format which i
>>>> believe you are not specifying anywhere and hence the program crashes.
>>>>
>>>> You could try something like this:
>>>>
>>>> Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends
>>>> OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
>>>> 46
>>>>
>>>> yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
>>>> "/output-location",Text.class, Text.class, outputFormatClass);
>>>>
>>>>
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Fri, Jan 9, 2015 at 10:22 AM, Su She <[email protected]> wrote:
>>>>
>>>>> Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when
>>>>> I call print on the Dstream it works? If I had to do foreachRDD to
>>>>> saveAsHadoopFile, then why is it working for print?
>>>>>
>>>>> Also, if I am doing foreachRDD, do I need connections, or can I simply
>>>>> put the saveAsHadoopFiles inside the foreachRDD function?
>>>>>
>>>>> Thanks Yana for the help! I will play around with foreachRDD and
>>>>> convey my results.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> are you calling the saveAsText files on the DStream --looks like it?
>>>>>> Look at the section called "Design Patterns for using foreachRDD" in the
>>>>>> link you sent -- you want to do  dstream.foreachRDD(rdd =>
>>>>>> rdd.saveAs....)
>>>>>>
>>>>>> On Thu, Jan 8, 2015 at 5:20 PM, Su She <[email protected]> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> Thanks in advance for the help!
>>>>>>>
>>>>>>> I successfully got my Kafka/Spark WordCount app to print locally.
>>>>>>> However, I want to run it on a cluster, which means that I will have to
>>>>>>> save it to HDFS if I want to be able to read the output.
>>>>>>>
>>>>>>> I am running Spark 1.1.0, which means according to this document:
>>>>>>> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html
>>>>>>>
>>>>>>> I should be able to use commands such as saveAsText/HadoopFiles.
>>>>>>>
>>>>>>> 1) When I try saveAsTextFiles it says:
>>>>>>> cannot find symbol
>>>>>>> [ERROR] symbol  : method
>>>>>>> saveAsTextFiles(java.lang.String,java.lang.String)
>>>>>>> [ERROR] location: class
>>>>>>> org.apache.spark.streaming.api.java.JavaPairDStream<java.lang.String,java.lang.Integer>
>>>>>>>
>>>>>>> This makes some sense as saveAsTextFiles is not included here:
>>>>>>>
>>>>>>> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html
>>>>>>>
>>>>>>> 2) When I try
>>>>>>> saveAsHadoopFiles("hdfs://ip....us-west-1.compute.internal:8020/user/testwordcount",
>>>>>>> "txt") it builds, but when I try running it it throws this exception:
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.RuntimeException:
>>>>>>> java.lang.RuntimeException: class scala.runtime.Nothing$ not
>>>>>>> org.apache.hadoop.mapred.OutputFormat
>>>>>>>         at
>>>>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
>>>>>>>         at
>>>>>>> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>         at scala.util.Try$.apply(Try.scala:161)
>>>>>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>>>> Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$
>>>>>>> not org.apache.hadoop.mapred.OutputFormat
>>>>>>>         at
>>>>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073)
>>>>>>>         ... 14 more
>>>>>>>
>>>>>>>
>>>>>>> Any help is really appreciated! Thanks.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to