Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-26 Thread Aljoscha Krettek
Hi Hung, after some discussion the way that window functions are used will change back to the way it was in 0.10.x, i.e. the Iterable is always part of the apply function. Sorry for the inconvenience this has caused. Cheers, Aljoscha > On 26 Feb 2016, at 11:48, Aljoscha Krettek

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-26 Thread Aljoscha Krettek
Hi, yes that seems to have been the issue. The Math.max() is used to ensure that the timestamp does never decrease, because this is not allowed for a watermark. Cheers, Aljoscha > On 26 Feb 2016, at 11:11, HungChang wrote: > > Ah! My incorrect code segment made the

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung, I see one thing that could explain the problem, the timestamp assigner should look like this: new AssignerWithPeriodicWatermarks() { long curTimeStamp; @Override public long extractTimestamp(BizEvent biz, long currentTimestamp)

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
An update. The following situation works as expected. The data arrives after Flink job starts to execute. 1> (2016-02-25T17:46:25.00,13) 2> (2016-02-25T17:46:40.00,16) 3> (2016-02-25T17:46:50.00,11) 4> (2016-02-25T17:47:10.00,12) But for the data arrives long time before. Strange behavior

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thank you for your reply. Please let me know if other classes o full code is needed. /** * Count how many total events */ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, env_config); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung, could you maybe post a more complete snippet of your program? This would allow me to figure out why the output changes between versions 0.10 and 1.0. @Matthias: The signature was changed to also allow window functions that don’t take an Iterable. For example, when doing

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Matthias J. Sax
Just out of curiosity: Why was it changes like this. Specifying "Iterable<...>" as type in AllWindowFunction seems rather unintuitive... -Matthias On 02/25/2016 01:58 PM, Aljoscha Krettek wrote: > Hi, > yes that is true. The way you would now write such a function is this: > > private static

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thanks you. I can be sure this way is correct now. I have tried this but the windows are not aggregating as well. Instead, the AllWindowFunction only works as flatMap. Shouldn't it only output for one window range? The most strange part is the first output is aggregating while others are not.

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi, yes that is true. The way you would now write such a function is this: private static class MyIterableFunction implements AllWindowFunction>, Tuple2, TimeWindow> { private static final long serialVersionUID = 1L; @Override public

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread HungChang
Thank you for your reply. The following in the current master looks like not iterable? because the parameter is IN rather than Iterable So I still have problem to iterate,,, @Public public interface AllWindowFunction extends Function, Serializable { /**

Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-25 Thread Aljoscha Krettek
Hi Hung, you are right, the generic parameters of AllWindowFunction changed from Iterable to IN. However, in the apply function on AllWindowedStream the parameter changed from IN to Iterable. What this means is that you can still do: windowed.apply(new MyIterableWindowFunction()) and iterate