We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
Spark Stream.

Records are published on Kafka every second. Our requirement is to store
records published on Kafka in a single folder per minute. The stream will
read records every five seconds. For instance records published during 1200
PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
folder "1201" and so on.

The code I wrote is as follows

//First Group records in RDD by date
stream.foreachRDD (rddWithinStream -> {
    JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory =
rddWithinStream.mapToPair(t -> {
    return new Tuple2<String, String> (targetHadoopFolder, t._2());
}).groupByKey();
// All records grouped by folders they will be stored in


// Create RDD for each target folder.
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
    JavaPairRDD <String, Iterable<String>> rddByKey =
rddGroupedByDirectory.filter(groupedTuples -> {
    return groupedTuples._1().equals(hadoopFolder);
    });

// And store it in Hadoop
    rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
String.class, TextOutputFormat.class);
}

Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
gets invoked multiple times in a minute. This causes "Part-00000" file to
be overwritten every time.

I was expecting that in the directory specified by "directory" parameter,
saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a
sinlge worker node.

Any help/alternatives are greatly appreciated.

Thanks.

Reply via email to