Thanks for your reply, I have submitted the PR. https://github.com/apache/flink/pull/18982 Looking forward to more discussions. best wishes
Martijn Visser <martijnvis...@apache.org> 于2022年3月4日周五 17:09写道: > Hi Deng Ziqi & Lin Wanni & Guo Yuanfang, > > First of all, I wanted to let you know that I think the ticket that you've > created is one of the most extensive and complete tickets I've seen. Thank > you very much for the effort on this! > > Based on your input I think it indeed looks like this should be addressed. > Perhaps there are other maintainers who are more familiar with this code > who can give a more in-depth answer. > > Best regards, > > Martijn Visser > https://twitter.com/MartijnVisser82 > > > On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dzq3210...@gmail.com> wrote: > > > I have created an issue on jira > > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334 > > issue > > > > Hello! > > > > When we were studying the flink source code, we found that there > > was a problem with its algorithm for calculating the window start time. > > When *timestamp - offset + windowSize < 0* , the element will be > > incorrectly allocated to a window with a WindowSize larger than its own > > timestamp. > > > > The problem is in > > *org.apache.flink.streaming.api.windowing.windows.TimeWindow* > > > > public static long getWindowStartWithOffset(long timestamp, long > > offset, long windowSize) { > > return timestamp - (timestamp - offset + windowSize) % windowSize; > > } > > > > We believe that this violates the constraints between time and > > window. *That is, an element should fall within a window whose start time > > is less than its own timestamp and whose end time is greater than its own > > timestamp.* However, the current situation is when *timestamp - offset + > > windowSize < 0*, *the element falls into a future time window.* > > > > *You can reproduce the bug with the code at the end of the post.* > > Solution > > > > In fact, the original algorithm is no problem in python, the key > to > > this problem is the processing of the remainder operation by the > > programming language. > > > > We finally think that it should be modified to the following > > algorithm. > > > > public static long getWindowStartWithOffset(long timestamp, long > > offset, long windowSize) { > > return timestamp > > - (timestamp - offset) % windowSize > > - (windowSize & (timestamp - offset) >> 63); > > } > > > > *windowSize & (timestamp - offset) >> 63* The function of this > > formula is to subtract windowSize from the overall operation result > > when *timestamp > > - offset<0*, otherwise do nothing. This way we can handle both positive > and > > negative timestamps. > > > > Finally, the element can be assigned to the correct window. > > > > This code can pass current unit tests. > > getWindowStartWithOffset methods in other packages > > > > I think that there should be many places in > > *getWindowStartWithOffset*. We searched for this method in the project > and > > found that the problem of negative timestamps is handled in > *flink.table.* > > > > Below is their source code. > > > > > > > *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping* > > > > private long getWindowStartWithOffset(long timestamp, long offset, > > long windowSize) { > > long remainder = (timestamp - offset) % windowSize; > > // handle both positive and negative cases if (remainder < 0) { > > return timestamp - (remainder + windowSize); > > } else { > > return timestamp - remainder; > > } > > } > > > > Can we make a pull request? > > > > If the community deems it necessary to revise it, hopefully this > > task can be handed over to us. Our members are all students who have just > > graduated from school, and it is a great encouragement for us to > contribute > > code to flink. > > > > Thank you so much! > > > > From Deng Ziqi & Lin Wanni & Guo Yuanfang > > > > > > =========================================== > > reproduce > > > > /* output > > WindowStart: -15000 ExactSize:1 (a,-17000) > > WindowStart: -10000 ExactSize:1 (b,-12000) > > WindowStart: -5000 ExactSize:2 (c,-7000) > > WindowStart: -5000 ExactSize:2 (d,-2000) > > WindowStart: 0 ExactSize:1 (e,3000) > > WindowStart: 5000 ExactSize:1 (f,8000) > > WindowStart: 10000 ExactSize:1 (g,13000) > > WindowStart: 15000 ExactSize:1 (h,18000) > > */public class Example { > > public static void main(String[] args) throws Exception { > > > > final TimeZone timeZone = TimeZone.getTimeZone("GTM+0"); > > TimeZone.setDefault(timeZone); > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env > > .setParallelism(1) > > .fromElements( > > Tuple2.of("a",-17*1000L), > > Tuple2.of("b",-12*1000L), > > Tuple2.of("c",-7*1000L), > > Tuple2.of("d",-2*1000L), > > Tuple2.of("e",3*1000L), > > Tuple2.of("f",8*1000L), > > Tuple2.of("g",13*1000L), > > Tuple2.of("h",18*1000L) > > ) > > .assignTimestampsAndWatermarks( > > > > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps() > > .withTimestampAssigner( > > new > > SerializableTimestampAssigner<Tuple2<String, Long>>() { > > @Override > > public long > > extractTimestamp(Tuple2<String, Long> element, long l) { > > return element.f1; > > } > > } > > ) > > ) > > .keyBy(r->1) > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > > .process( > > new ProcessWindowFunction<Tuple2<String, > > Long>, String, Integer, TimeWindow>() { > > @Override > > public void process(Integer integer, > > ProcessWindowFunction<Tuple2<String, Long>, String, Integer, > > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, > > Collector<String> out) throws Exception { > > for (Tuple2<String, Long> element : > > elements) { > > out.collect("WindowStart: > > "+context.window().getStart() > > + "\tExactSize:" + > > elements.spliterator().getExactSizeIfKnown()+"\t" > > + element > > ); > > } > > } > > } > > ) > > .print(); > > env.execute(); > > } > > } > > >