If you want 1 minute granularity, why not use a 1 minute batch time? Also, HDFS is not a great match for this kind of thing, because of the small files issue.
On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com> wrote: > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the > Spark Stream. > > Records are published on Kafka every second. Our requirement is to store > records published on Kafka in a single folder per minute. The stream will > read records every five seconds. For instance records published during 1200 > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in > folder "1201" and so on. > > The code I wrote is as follows > > //First Group records in RDD by date > stream.foreachRDD (rddWithinStream -> { > JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = > rddWithinStream.mapToPair(t -> { > return new Tuple2<String, String> (targetHadoopFolder, t._2()); > }).groupByKey(); > // All records grouped by folders they will be stored in > > > // Create RDD for each target folder. > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { > JavaPairRDD <String, Iterable<String>> rddByKey = > rddGroupedByDirectory.filter(groupedTuples -> { > return groupedTuples._1().equals(hadoopFolder); > }); > > // And store it in Hadoop > rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, > TextOutputFormat.class); > } > > Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile > gets invoked multiple times in a minute. This causes "Part-00000" file to be > overwritten every time. > > I was expecting that in the directory specified by "directory" parameter, > saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a > sinlge worker node. > > Any help/alternatives are greatly appreciated. > > Thanks. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org