I tried to implement your idea but I'm getting NullPointer exceptions from the AvroInputFormat any Idea what I'm doing wrong? See the code below:
public static void main(String[] args) throws Exception { // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.fromElements("00", "01", "02","03","22","23") .flatMap(new FileExtractor()) .filter(new LocationFiter()) .flatMap(new PreProcessEndSongClean()) .writeAsCsv(outPath); env.execute("something"); } private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{ @Override public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws Exception { AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class); avroInputFormat.setReuseAvroValue(false); while (! avroInputFormat.reachedEnd()){ EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq()); if (res != null) collector.collect(res); } } } On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneum...@sics.se> wrote: > I guess I need to set the parallelism for the FlatMap to 1 to make sure I > read one file at a time. The downside I see with this is that I will be not > able to read in parallel from HDFS (and the files are Huge). > > I give it a try and see how much performance I loose. > > cheers Martin > > On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <se...@apache.org> wrote: > >> Martin, >> >> I think you can approximate this in an easy way like this: >> >> - On the client, you traverse your directories to collect all files >> that you need, collect all file paths in a list. >> - Then you have a source "env.fromElements(paths)". >> - Then you flatMap and in the FlatMap, run the Avro input format (open >> it per path, then call it to get all elements) >> >> That gives you pretty much full control about in which order the files >> should be processed. >> >> What do you think? >> >> Stephan >> >> >> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneum...@sics.se> wrote: >> >>> I forgot to mention I'm using an AvroInputFormat to read the file (that >>> might be relevant how the flag needs to be applied) >>> See the code Snipped below: >>> >>> DataStream<EndSongCleanedPq> inStream = >>> env.readFile(new AvroInputFormat<EndSongCleanedPq>(new >>> Path(filePath), EndSongCleanedPq.class), filePath); >>> >>> >>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneum...@sics.se> >>> wrote: >>> >>>> The program is a DataStream program, it usually it gets the data from >>>> kafka. It's an anomaly detection program that learns from the stream >>>> itself. The reason I want to read from files is to test different settings >>>> of the algorithm and compare them. >>>> >>>> I think I don't need to reply things in the exact order (wich is not >>>> possible with parallel reads anyway) and I have written the program so it >>>> can deal with out of order events. >>>> I only need the subfolders to be processed roughly in order. Its fine >>>> to process some stuff from 01 before everything from 00 is finished, if I >>>> get records from all 24 subfolders at the same time things will break >>>> though. If I set the flag will it try to get data from all sub dir's in >>>> parallel or will it go sub dir by sub dir? >>>> >>>> Also can you point me to some documentation or something where I can >>>> see how to set the Flag? >>>> >>>> cheers Martin >>>> >>>> >>>> >>>> >>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <se...@apache.org> >>>> wrote: >>>> >>>>> Hi! >>>>> >>>>> Going through nested folders is pretty simple, there is a flag on the >>>>> FileInputFormat that makes sure those are read. >>>>> >>>>> Tricky is the part that all "00" files should be read before the "01" >>>>> files. If you still want parallel reads, that means you need to sync at >>>>> some point, wait for all parallel parts to finish with the "00" work >>>>> before >>>>> anyone may start with the "01" work. >>>>> >>>>> Is your training program a DataStream or a DataSet program?` >>>>> >>>>> Stephan >>>>> >>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneum...@sics.se> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have a streaming machine learning job that usually runs with input >>>>>> from kafka. To tweak the models I need to run on some old data from HDFS. >>>>>> >>>>>> Unfortunately the data on HDFS is spread out over several subfolders. >>>>>> Basically I have a datum with one subfolder for each hour within those >>>>>> are >>>>>> the actual input files I'm interested in. >>>>>> >>>>>> Basically what I need is a source that goes through the subfolder in >>>>>> order and streams the files into the program. I'm using event timestamps >>>>>> so >>>>>> all files in 00 need to be processed before 01. >>>>>> >>>>>> Has anyone an idea on how to do this? >>>>>> >>>>>> cheers Martin >>>>>> >>>>>> >>>>> >>>> >>> >> >