Hi Sebastian,
Thanks for your reply. I think using rdd.timestamp may cause one issue. If the parallelized RDD is executed on more than one workers, there may be a race condition if rdd.timestamp is used. It may also result in overwriting the file. Shridhar On Wed, Mar 23, 2016 at 12:32 AM, Sebastian Piu <sebastian....@gmail.com> wrote: > As you said, create a folder for each different minute, you can use the > rdd.time also as a timestamp. > > Also you might want to have a look at the window function for the batching > > > On Tue, 22 Mar 2016, 17:43 vetal king, <greenve...@gmail.com> wrote: > >> Hi Cody, >> >> Thanks for your reply. >> >> Five seconds batch and one min publishing interval is just a >> representative example. What we want is, to group data over a certain >> frequency. That frequency is configurable. One way we think it can be >> achieved is "directory" will be created per this frequency, and in this >> directory we will create folders at when the stream receives data. >> Something like >> >> rddByKey.saveAsNewAPIHadoopFile(directory + "-" + <current time in >> milliseconds OR some random number>, String.class, String.class, >> TextOutputFormat.class). >> >> But I think it will be too much of nested directory structure, and it >> sounds too inefficient as well. since there will be a lot of small files. >> >> Shridhar >> >> >> >> >> On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> If you want 1 minute granularity, why not use a 1 minute batch time? >>> >>> Also, HDFS is not a great match for this kind of thing, because of the >>> small files issue. >>> >>> On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com> >>> wrote: >>> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for >>> the >>> > Spark Stream. >>> > >>> > Records are published on Kafka every second. Our requirement is to >>> store >>> > records published on Kafka in a single folder per minute. The stream >>> will >>> > read records every five seconds. For instance records published during >>> 1200 >>> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in >>> > folder "1201" and so on. >>> > >>> > The code I wrote is as follows >>> > >>> > //First Group records in RDD by date >>> > stream.foreachRDD (rddWithinStream -> { >>> > JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = >>> > rddWithinStream.mapToPair(t -> { >>> > return new Tuple2<String, String> (targetHadoopFolder, t._2()); >>> > }).groupByKey(); >>> > // All records grouped by folders they will be stored in >>> > >>> > >>> > // Create RDD for each target folder. >>> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { >>> > JavaPairRDD <String, Iterable<String>> rddByKey = >>> > rddGroupedByDirectory.filter(groupedTuples -> { >>> > return groupedTuples._1().equals(hadoopFolder); >>> > }); >>> > >>> > // And store it in Hadoop >>> > rddByKey.saveAsNewAPIHadoopFile(directory, String.class, >>> String.class, >>> > TextOutputFormat.class); >>> > } >>> > >>> > Since the Stream processes data every five seconds, >>> saveAsNewAPIHadoopFile >>> > gets invoked multiple times in a minute. This causes "Part-00000" file >>> to be >>> > overwritten every time. >>> > >>> > I was expecting that in the directory specified by "directory" >>> parameter, >>> > saveAsNewAPIHadoopFile will keep creating part-0000N file even when >>> I've a >>> > sinlge worker node. >>> > >>> > Any help/alternatives are greatly appreciated. >>> > >>> > Thanks. >>> >> >>