Hi Sebastian, Yes... my mistake... you are right. Every partition will create a different file.
Shridhar On Fri, Mar 25, 2016 at 6:58 PM, Sebastian Piu <sebastian....@gmail.com> wrote: > I dont understand about the race condition comment you mention. > Have you seen this somewhere? That timestamp will be the same on each > worker for that rdd, and each worker is handling a different partition > which will be reflected on the filename, so no data will be overwriting. In > fact this is what saveAsNewHadoopFile on a DStream is doing as far as I > recall > > On Fri, 25 Mar 2016, 11:22 vetal king, <greenve...@gmail.com> wrote: > >> Hi Surendra, >> >> Thanks for your suggestion. I tried MultipleOutputForma and >> MultipleTextOutputFormat. But the result was the same. The folder would >> always contain a single file part-r-00000, and this file gets overwritten >> everytime. >> >> This is how I am invoking the API >> dataToWrite.saveAsHadoopFile(directory, String.class, >> String.class, MultipleOutputFormat.class); >> Am I missing something? >> >> Regards, >> Shridhar >> >> >> On Wed, Mar 23, 2016 at 11:55 AM, Surendra , Manchikanti < >> surendra.manchika...@gmail.com> wrote: >> >>> Hi Vetal, >>> >>> You may try with MultiOutPutFormat instead of TextOutPutFormat in >>> saveAsNewAPIHadoopFile(). >>> >>> Regards, >>> Surendra M >>> >>> -- Surendra Manchikanti >>> >>> On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> >>> wrote: >>> >>>> We are using Spark 1.4 for Spark Streaming. Kafka is data source for >>>> the Spark Stream. >>>> >>>> Records are published on Kafka every second. Our requirement is to >>>> store records published on Kafka in a single folder per minute. The stream >>>> will read records every five seconds. For instance records published during >>>> 1200 PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM >>>> in folder "1201" and so on. >>>> >>>> The code I wrote is as follows >>>> >>>> //First Group records in RDD by date >>>> stream.foreachRDD (rddWithinStream -> { >>>> JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = >>>> rddWithinStream.mapToPair(t -> { >>>> return new Tuple2<String, String> (targetHadoopFolder, t._2()); >>>> }).groupByKey(); >>>> // All records grouped by folders they will be stored in >>>> >>>> >>>> // Create RDD for each target folder. >>>> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { >>>> JavaPairRDD <String, Iterable<String>> rddByKey = >>>> rddGroupedByDirectory.filter(groupedTuples -> { >>>> return groupedTuples._1().equals(hadoopFolder); >>>> }); >>>> >>>> // And store it in Hadoop >>>> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, >>>> TextOutputFormat.class); >>>> } >>>> >>>> Since the Stream processes data every five seconds, >>>> saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes >>>> "Part-00000" file to be overwritten every time. >>>> >>>> I was expecting that in the directory specified by "directory" >>>> parameter, saveAsNewAPIHadoopFile will keep creating part-0000N file even >>>> when I've a sinlge worker node. >>>> >>>> Any help/alternatives are greatly appreciated. >>>> >>>> Thanks. >>>> >>> >>> >>