Hi Cody, Thanks for your reply.
Five seconds batch and one min publishing interval is just a representative example. What we want is, to group data over a certain frequency. That frequency is configurable. One way we think it can be achieved is "directory" will be created per this frequency, and in this directory we will create folders at when the stream receives data. Something like rddByKey.saveAsNewAPIHadoopFile(directory + "-" + <current time in milliseconds OR some random number>, String.class, String.class, TextOutputFormat.class). But I think it will be too much of nested directory structure, and it sounds too inefficient as well. since there will be a lot of small files. Shridhar On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger <c...@koeninger.org> wrote: > If you want 1 minute granularity, why not use a 1 minute batch time? > > Also, HDFS is not a great match for this kind of thing, because of the > small files issue. > > On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com> wrote: > > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the > > Spark Stream. > > > > Records are published on Kafka every second. Our requirement is to store > > records published on Kafka in a single folder per minute. The stream will > > read records every five seconds. For instance records published during > 1200 > > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in > > folder "1201" and so on. > > > > The code I wrote is as follows > > > > //First Group records in RDD by date > > stream.foreachRDD (rddWithinStream -> { > > JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = > > rddWithinStream.mapToPair(t -> { > > return new Tuple2<String, String> (targetHadoopFolder, t._2()); > > }).groupByKey(); > > // All records grouped by folders they will be stored in > > > > > > // Create RDD for each target folder. > > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { > > JavaPairRDD <String, Iterable<String>> rddByKey = > > rddGroupedByDirectory.filter(groupedTuples -> { > > return groupedTuples._1().equals(hadoopFolder); > > }); > > > > // And store it in Hadoop > > rddByKey.saveAsNewAPIHadoopFile(directory, String.class, > String.class, > > TextOutputFormat.class); > > } > > > > Since the Stream processes data every five seconds, > saveAsNewAPIHadoopFile > > gets invoked multiple times in a minute. This causes "Part-00000" file > to be > > overwritten every time. > > > > I was expecting that in the directory specified by "directory" parameter, > > saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've > a > > sinlge worker node. > > > > Any help/alternatives are greatly appreciated. > > > > Thanks. >