I tried to implement your idea but I'm getting NullPointer exceptions from
the AvroInputFormat any Idea what I'm doing wrong?
See the code below:

public static void main(String[] args) throws Exception {

    // set up the execution environment
    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.fromElements("00", "01", "02","03","22","23")
            .flatMap(new FileExtractor())
            .filter(new LocationFiter())
            .flatMap(new PreProcessEndSongClean())
            .writeAsCsv(outPath);


    env.execute("something");
}

private static class FileExtractor implements
FlatMapFunction<String,EndSongCleanedPq>{

    @Override
    public void flatMap(String s, Collector<EndSongCleanedPq>
collector) throws Exception {
        AvroInputFormat<EndSongCleanedPq> avroInputFormat = new
AvroInputFormat<EndSongCleanedPq>(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s),
EndSongCleanedPq.class);
        avroInputFormat.setReuseAvroValue(false);
        while (! avroInputFormat.reachedEnd()){
                EndSongCleanedPq res = avroInputFormat.nextRecord(new
EndSongCleanedPq());
                if (res != null) collector.collect(res);
        }
    }
}


On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneum...@sics.se> wrote:

> I guess I need to set the parallelism for the FlatMap to 1 to make sure I
> read one file at a time. The downside I see with this is that I will be not
> able to read in parallel from HDFS (and the files are Huge).
>
> I give it a try and see how much performance I loose.
>
> cheers Martin
>
> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Martin,
>>
>> I think you can approximate this in an easy way like this:
>>
>>   - On the client, you traverse your directories to collect all files
>> that you need, collect all file paths in a list.
>>   - Then you have a source "env.fromElements(paths)".
>>   - Then you flatMap and in the FlatMap, run the Avro input format (open
>> it per path, then call it to get all elements)
>>
>> That gives you pretty much full control about in which order the files
>> should be processed.
>>
>> What do you think?
>>
>> Stephan
>>
>>
>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneum...@sics.se> wrote:
>>
>>> I forgot to mention I'm using an AvroInputFormat to read the file (that
>>> might be relevant how the flag needs to be applied)
>>> See the code Snipped below:
>>>
>>> DataStream<EndSongCleanedPq> inStream =
>>>         env.readFile(new AvroInputFormat<EndSongCleanedPq>(new 
>>> Path(filePath), EndSongCleanedPq.class), filePath);
>>>
>>>
>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneum...@sics.se>
>>> wrote:
>>>
>>>> The program is a DataStream program, it usually it gets the data from
>>>> kafka. It's an anomaly detection program that learns from the stream
>>>> itself. The reason I want to read from files is to test different settings
>>>> of the algorithm and compare them.
>>>>
>>>> I think I don't need to reply things in the exact order (wich is not
>>>> possible with parallel reads anyway) and I have written the program so it
>>>> can deal with out of order events.
>>>> I only need the subfolders to be processed roughly in order. Its fine
>>>> to process some stuff from 01 before everything from 00 is finished, if I
>>>> get records from all 24 subfolders at the same time things will break
>>>> though. If I set the flag will it try to get data from all sub dir's in
>>>> parallel or will it go sub dir by sub dir?
>>>>
>>>> Also can you point me to some documentation or something where I can
>>>> see how to set the Flag?
>>>>
>>>> cheers Martin
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Going through nested folders is pretty simple, there is a flag on the
>>>>> FileInputFormat that makes sure those are read.
>>>>>
>>>>> Tricky is the part that all "00" files should be read before the "01"
>>>>> files. If you still want parallel reads, that means you need to sync at
>>>>> some point, wait for all parallel parts to finish with the "00" work 
>>>>> before
>>>>> anyone may start with the "01" work.
>>>>>
>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>
>>>>> Stephan
>>>>>
>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneum...@sics.se>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a streaming machine learning job that usually runs with input
>>>>>> from kafka. To tweak the models I need to run on some old data from HDFS.
>>>>>>
>>>>>> Unfortunately the data on HDFS is spread out over several subfolders.
>>>>>> Basically I have a datum with one subfolder for each hour within those 
>>>>>> are
>>>>>> the actual input files I'm interested in.
>>>>>>
>>>>>> Basically what I need is a source that goes through the subfolder in
>>>>>> order and streams the files into the program. I'm using event timestamps 
>>>>>> so
>>>>>> all files in 00 need to be processed before 01.
>>>>>>
>>>>>> Has anyone an idea on how to do this?
>>>>>>
>>>>>> cheers Martin
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to