Hi Vetal, You may try with MultiOutPutFormat instead of TextOutPutFormat in saveAsNewAPIHadoopFile().
Regards, Surendra M -- Surendra Manchikanti On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> wrote: > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the > Spark Stream. > > Records are published on Kafka every second. Our requirement is to store > records published on Kafka in a single folder per minute. The stream will > read records every five seconds. For instance records published during 1200 > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in > folder "1201" and so on. > > The code I wrote is as follows > > //First Group records in RDD by date > stream.foreachRDD (rddWithinStream -> { > JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = > rddWithinStream.mapToPair(t -> { > return new Tuple2<String, String> (targetHadoopFolder, t._2()); > }).groupByKey(); > // All records grouped by folders they will be stored in > > > // Create RDD for each target folder. > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { > JavaPairRDD <String, Iterable<String>> rddByKey = > rddGroupedByDirectory.filter(groupedTuples -> { > return groupedTuples._1().equals(hadoopFolder); > }); > > // And store it in Hadoop > rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, > TextOutputFormat.class); > } > > Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile > gets invoked multiple times in a minute. This causes "Part-00000" file to > be overwritten every time. > > I was expecting that in the directory specified by "directory" parameter, > saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a > sinlge worker node. > > Any help/alternatives are greatly appreciated. > > Thanks. >