Hi Yassine, thanks that explains it :)
Best regards, Felix On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com> wrote: > Hi Flelix, > > As I see in kddcup.newtestdata_small_unlabeled_index > <https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/resources/data/kddcup.newtestdata_small_unlabeled_index>, > the first field of connectionRecords (splits[0]), is unique for each > record, therefore when apply keyBy(0), it will logically partition your > stream by that field and each partition will contain only one element. So > the countWindow(2) actually never fires because it never reaches 2 > elements. That's why your files stay empty. > > Could you please go into more detail about what the expected output is? Then > we might be able to figure out the proper way to achieve it. > > Best, > Yassine > > 2016-11-07 19:18 GMT+01:00 Felix Neutatz <neut...@googlemail.com>: > >> Hi Till, >> >> the mapper solution makes sense :) >> >> Unfortunately, in my case it was not a typo in the path. I checked and >> saw that the records are read. >> >> You can find the whole program here: https://github.com/Felix >> Neutatz/CluStream/blob/master/flink-java-project/src/main/ >> java/org/apache/flink/clustream/StreamingJobIndex.java >> >> I am happy for any ideas. >> >> Best regards, >> Felix >> >> 2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrm...@apache.org>: >> >>> Hi Felix, >>> >>> I'm not sure whether grouping/keyBy by processing time makes >>> semantically any sense. This can be anything depending on the execution >>> order. Therefore, there is not build in mechanism to group by processing >>> time. But you can always write a mapper which assigns the current >>> processing time to the stream record and use this field for grouping. >>> >>> Concerning your second problem, could you check the path of the file? At >>> the moment Flink fails silently if the path is not valid. It might be that >>> you have a simple typo in the path. I've opened an issue to fix this issue >>> [1]. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-5027 >>> >>> Cheers, >>> Till >>> >>> >>> >>> >>> >>> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neut...@googlemail.com> >>> wrote: >>> >>>> Hi everybody, >>>> >>>> I finally reached streaming territory. For a student project I want to >>>> implement CluStream for Flink. I guess this is especially interesting to >>>> try queryable state :) >>>> >>>> But I have problems at the first steps. My input data is a csv file of >>>> records. For the start I just want to window this csv. I don't want to use >>>> AllWindows >>>> because it's not parallelizable. >>>> >>>> So my first question is: Can I window by processing time, like this: >>>> >>>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L)) >>>> >>>> I didn't find a way, so I added in the csv an index column and tried to >>>> use a countWindow: >>>> >>>> DataStreamSource<String> source = env.readTextFile(file.getPath()); >>>> >>>> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new >>>> MapToVector()).setParallelism(4); >>>> >>>> connectionRecords.keyBy(0).countWindow(10).apply ( >>>> new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, >>>> GlobalWindow>() { >>>> public void apply (Tuple tuple, >>>> GlobalWindow window, >>>> Iterable<Tuple2<Long, Vector>> values, >>>> Collector<Tuple1<Integer>> out) throws Exception { >>>> int sum = 0; >>>> Iterator iterator = values.iterator(); >>>> while (iterator.hasNext () ) { >>>> Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next(); >>>> sum += 1; >>>> } >>>> out.collect (new Tuple1<Integer>(new Integer(sum))); >>>> } >>>> }).writeAsCsv("text"); >>>> >>>> To check whether everything works I just count the elements per window and >>>> write them into a csv file. >>>> >>>> Flink generates the files but all are empty. Can you tell me, what I did >>>> wrong? >>>> >>>> Best regards, >>>> >>>> Felix >>>> >>>> >>> >> >