Hi Sebastian,

Thanks for your reply. I think using rdd.timestamp may cause one issue. If
the parallelized RDD is executed on more than one workers, there may be a
race condition if rdd.timestamp is used. It may also result in overwriting
the file.

Shridhar

On Wed, Mar 23, 2016 at 12:32 AM, Sebastian Piu <sebastian....@gmail.com>
wrote:

> As you said, create a folder for each different minute, you can use the
> rdd.time also as a timestamp.
>
> Also you might want to have a look at the window function for the batching
>
>
> On Tue, 22 Mar 2016, 17:43 vetal king, <greenve...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> Thanks for your reply.
>>
>> Five seconds batch and one min publishing interval is just a
>> representative example. What we want is, to group data over a certain
>> frequency. That frequency is configurable. One way we think it can be
>> achieved is "directory"  will be created per this frequency, and in this
>> directory we will create folders at when the stream receives data.
>> Something like
>>
>> rddByKey.saveAsNewAPIHadoopFile(directory + "-" + <current time in
>> milliseconds OR some random number>, String.class, String.class,
>> TextOutputFormat.class).
>>
>> But I think it will be too much of nested directory structure, and it
>> sounds too inefficient as well. since there will be a lot of small files.
>>
>> Shridhar
>>
>>
>>
>>
>> On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> If you want 1 minute granularity, why not use a 1 minute batch time?
>>>
>>> Also, HDFS is not a great match for this kind of thing, because of the
>>> small files issue.
>>>
>>> On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com>
>>> wrote:
>>> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for
>>> the
>>> > Spark Stream.
>>> >
>>> > Records are published on Kafka every second. Our requirement is to
>>> store
>>> > records published on Kafka in a single folder per minute. The stream
>>> will
>>> > read records every five seconds. For instance records published during
>>> 1200
>>> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
>>> > folder "1201" and so on.
>>> >
>>> > The code I wrote is as follows
>>> >
>>> > //First Group records in RDD by date
>>> > stream.foreachRDD (rddWithinStream -> {
>>> >     JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory =
>>> > rddWithinStream.mapToPair(t -> {
>>> >     return new Tuple2<String, String> (targetHadoopFolder, t._2());
>>> > }).groupByKey();
>>> > // All records grouped by folders they will be stored in
>>> >
>>> >
>>> > // Create RDD for each target folder.
>>> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>>> >     JavaPairRDD <String, Iterable<String>> rddByKey =
>>> > rddGroupedByDirectory.filter(groupedTuples -> {
>>> >     return groupedTuples._1().equals(hadoopFolder);
>>> >     });
>>> >
>>> > // And store it in Hadoop
>>> >     rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
>>> String.class,
>>> > TextOutputFormat.class);
>>> > }
>>> >
>>> > Since the Stream processes data every five seconds,
>>> saveAsNewAPIHadoopFile
>>> > gets invoked multiple times in a minute. This causes "Part-00000" file
>>> to be
>>> > overwritten every time.
>>> >
>>> > I was expecting that in the directory specified by "directory"
>>> parameter,
>>> > saveAsNewAPIHadoopFile will keep creating part-0000N file even when
>>> I've a
>>> > sinlge worker node.
>>> >
>>> > Any help/alternatives are greatly appreciated.
>>> >
>>> > Thanks.
>>>
>>
>>

Reply via email to