Hi Martin, where is the null pointer exception thrown? I think you didn't call the open() method of the AvroInputFormat. Maybe that's the issue.
On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneum...@sics.se> wrote: > I tried to implement your idea but I'm getting NullPointer exceptions from > the AvroInputFormat any Idea what I'm doing wrong? > See the code below: > > public static void main(String[] args) throws Exception { > > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > env.fromElements("00", "01", "02","03","22","23") > .flatMap(new FileExtractor()) > .filter(new LocationFiter()) > .flatMap(new PreProcessEndSongClean()) > .writeAsCsv(outPath); > > > env.execute("something"); > } > > private static class FileExtractor implements > FlatMapFunction<String,EndSongCleanedPq>{ > > @Override > public void flatMap(String s, Collector<EndSongCleanedPq> collector) > throws Exception { > AvroInputFormat<EndSongCleanedPq> avroInputFormat = new > AvroInputFormat<EndSongCleanedPq>(new > Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class); > avroInputFormat.setReuseAvroValue(false); > while (! avroInputFormat.reachedEnd()){ > EndSongCleanedPq res = avroInputFormat.nextRecord(new > EndSongCleanedPq()); > if (res != null) collector.collect(res); > } > } > } > > > On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneum...@sics.se> wrote: > >> I guess I need to set the parallelism for the FlatMap to 1 to make sure I >> read one file at a time. The downside I see with this is that I will be not >> able to read in parallel from HDFS (and the files are Huge). >> >> I give it a try and see how much performance I loose. >> >> cheers Martin >> >> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Martin, >>> >>> I think you can approximate this in an easy way like this: >>> >>> - On the client, you traverse your directories to collect all files >>> that you need, collect all file paths in a list. >>> - Then you have a source "env.fromElements(paths)". >>> - Then you flatMap and in the FlatMap, run the Avro input format (open >>> it per path, then call it to get all elements) >>> >>> That gives you pretty much full control about in which order the files >>> should be processed. >>> >>> What do you think? >>> >>> Stephan >>> >>> >>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneum...@sics.se> >>> wrote: >>> >>>> I forgot to mention I'm using an AvroInputFormat to read the file (that >>>> might be relevant how the flag needs to be applied) >>>> See the code Snipped below: >>>> >>>> DataStream<EndSongCleanedPq> inStream = >>>> env.readFile(new AvroInputFormat<EndSongCleanedPq>(new >>>> Path(filePath), EndSongCleanedPq.class), filePath); >>>> >>>> >>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneum...@sics.se> >>>> wrote: >>>> >>>>> The program is a DataStream program, it usually it gets the data from >>>>> kafka. It's an anomaly detection program that learns from the stream >>>>> itself. The reason I want to read from files is to test different settings >>>>> of the algorithm and compare them. >>>>> >>>>> I think I don't need to reply things in the exact order (wich is not >>>>> possible with parallel reads anyway) and I have written the program so it >>>>> can deal with out of order events. >>>>> I only need the subfolders to be processed roughly in order. Its fine >>>>> to process some stuff from 01 before everything from 00 is finished, if I >>>>> get records from all 24 subfolders at the same time things will break >>>>> though. If I set the flag will it try to get data from all sub dir's in >>>>> parallel or will it go sub dir by sub dir? >>>>> >>>>> Also can you point me to some documentation or something where I can >>>>> see how to set the Flag? >>>>> >>>>> cheers Martin >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Going through nested folders is pretty simple, there is a flag on the >>>>>> FileInputFormat that makes sure those are read. >>>>>> >>>>>> Tricky is the part that all "00" files should be read before the "01" >>>>>> files. If you still want parallel reads, that means you need to sync at >>>>>> some point, wait for all parallel parts to finish with the "00" work >>>>>> before >>>>>> anyone may start with the "01" work. >>>>>> >>>>>> Is your training program a DataStream or a DataSet program?` >>>>>> >>>>>> Stephan >>>>>> >>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneum...@sics.se> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a streaming machine learning job that usually runs with input >>>>>>> from kafka. To tweak the models I need to run on some old data from >>>>>>> HDFS. >>>>>>> >>>>>>> Unfortunately the data on HDFS is spread out over several >>>>>>> subfolders. Basically I have a datum with one subfolder for each hour >>>>>>> within those are the actual input files I'm interested in. >>>>>>> >>>>>>> Basically what I need is a source that goes through the subfolder in >>>>>>> order and streams the files into the program. I'm using event >>>>>>> timestamps so >>>>>>> all files in 00 need to be processed before 01. >>>>>>> >>>>>>> Has anyone an idea on how to do this? >>>>>>> >>>>>>> cheers Martin >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >