Hi Vetal,

You may try with MultiOutPutFormat instead of TextOutPutFormat in
saveAsNewAPIHadoopFile().

Regards,
Surendra M

-- Surendra Manchikanti

On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> wrote:

> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
> Spark Stream.
>
> Records are published on Kafka every second. Our requirement is to store
> records published on Kafka in a single folder per minute. The stream will
> read records every five seconds. For instance records published during 1200
> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
> folder "1201" and so on.
>
> The code I wrote is as follows
>
> //First Group records in RDD by date
> stream.foreachRDD (rddWithinStream -> {
>     JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = 
> rddWithinStream.mapToPair(t -> {
>     return new Tuple2<String, String> (targetHadoopFolder, t._2());
> }).groupByKey();
> // All records grouped by folders they will be stored in
>
>
> // Create RDD for each target folder.
> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>     JavaPairRDD <String, Iterable<String>> rddByKey = 
> rddGroupedByDirectory.filter(groupedTuples -> {
>     return groupedTuples._1().equals(hadoopFolder);
>     });
>
> // And store it in Hadoop
>     rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, 
> TextOutputFormat.class);
> }
>
> Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
> gets invoked multiple times in a minute. This causes "Part-00000" file to
> be overwritten every time.
>
> I was expecting that in the directory specified by "directory" parameter,
> saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a
> sinlge worker node.
>
> Any help/alternatives are greatly appreciated.
>
> Thanks.
>

Reply via email to