Thanks for your reply, I have submitted the PR.
https://github.com/apache/flink/pull/18982
Looking forward to more discussions.
best wishes

Martijn Visser <martijnvis...@apache.org> 于2022年3月4日周五 17:09写道:

> Hi Deng Ziqi & Lin Wanni & Guo Yuanfang,
>
> First of all, I wanted to let you know that I think the ticket that you've
> created is one of the most extensive and complete tickets I've seen. Thank
> you very much for the effort on this!
>
> Based on your input I think it indeed looks like this should be addressed.
> Perhaps there are other maintainers who are more familiar with this code
> who can give a more in-depth answer.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dzq3210...@gmail.com> wrote:
>
> > I have created an issue on jira
> > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334
> > issue
> >
> >         Hello!
> >
> >         When we were studying the flink source code, we found that there
> > was a problem with its algorithm for calculating the window start time.
> > When *timestamp - offset + windowSize < 0* , the element will be
> > incorrectly allocated to a window with a WindowSize larger than its own
> > timestamp.
> >
> >         The problem is in
> > *org.apache.flink.streaming.api.windowing.windows.TimeWindow*
> >
> > public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> >     return timestamp - (timestamp - offset + windowSize) % windowSize;
> > }
> >
> >         We believe that this violates the constraints between time and
> > window. *That is, an element should fall within a window whose start time
> > is less than its own timestamp and whose end time is greater than its own
> > timestamp.* However, the current situation is when *timestamp - offset +
> > windowSize < 0*, *the element falls into a future time window.*
> >
> >        *You can reproduce the bug with the code at the end of the post.*
> > Solution
> >
> >         In fact, the original algorithm is no problem in python, the key
> to
> > this problem is the processing of the remainder operation by the
> > programming language.
> >
> >         We finally think that it should be modified to the following
> > algorithm.
> >
> > public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> >     return timestamp
> >             - (timestamp - offset) % windowSize
> >             - (windowSize & (timestamp - offset) >> 63);
> > }
> >
> >         *windowSize & (timestamp - offset) >> 63* The function of this
> > formula is to subtract windowSize from the overall operation result
> > when *timestamp
> > - offset<0*, otherwise do nothing. This way we can handle both positive
> and
> > negative timestamps.
> >
> >         Finally, the element can be assigned to the correct window.
> >
> >         This code can pass current unit tests.
> > getWindowStartWithOffset methods in other packages
> >
> >         I think that there should be many places in
> > *getWindowStartWithOffset*. We searched for this method in the project
> and
> > found that the problem of negative timestamps is handled in
> *flink.table.*
> >
> >         Below is their source code.
> >
> >
> >
> *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping*
> >
> > private long getWindowStartWithOffset(long timestamp, long offset,
> > long windowSize) {
> >     long remainder = (timestamp - offset) % windowSize;
> >     // handle both positive and negative cases    if (remainder < 0) {
> >         return timestamp - (remainder + windowSize);
> >     } else {
> >         return timestamp - remainder;
> >     }
> > }
> >
> > Can we make a pull request?
> >
> >         If the community deems it necessary to revise it, hopefully this
> > task can be handed over to us. Our members are all students who have just
> > graduated from school, and it is a great encouragement for us to
> contribute
> > code to flink.
> >
> >         Thank you so much!
> >
> >         From Deng Ziqi & Lin Wanni & Guo Yuanfang
> >
> >
> >  ===========================================
> > reproduce
> >
> > /* output
> > WindowStart: -15000    ExactSize:1    (a,-17000)
> > WindowStart: -10000    ExactSize:1    (b,-12000)
> > WindowStart: -5000 ExactSize:2    (c,-7000)
> > WindowStart: -5000 ExactSize:2    (d,-2000)
> > WindowStart: 0 ExactSize:1    (e,3000)
> > WindowStart: 5000  ExactSize:1    (f,8000)
> > WindowStart: 10000 ExactSize:1    (g,13000)
> > WindowStart: 15000 ExactSize:1    (h,18000)
> >  */public class Example {
> >     public static void main(String[] args) throws Exception {
> >
> >         final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
> >         TimeZone.setDefault(timeZone);
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         env
> >                 .setParallelism(1)
> >                 .fromElements(
> >                         Tuple2.of("a",-17*1000L),
> >                         Tuple2.of("b",-12*1000L),
> >                         Tuple2.of("c",-7*1000L),
> >                         Tuple2.of("d",-2*1000L),
> >                         Tuple2.of("e",3*1000L),
> >                         Tuple2.of("f",8*1000L),
> >                         Tuple2.of("g",13*1000L),
> >                         Tuple2.of("h",18*1000L)
> >                 )
> >                 .assignTimestampsAndWatermarks(
> >
> > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
> >                                 .withTimestampAssigner(
> >                                         new
> > SerializableTimestampAssigner<Tuple2<String, Long>>() {
> >                                             @Override
> >                                             public long
> > extractTimestamp(Tuple2<String, Long> element, long l) {
> >                                                 return element.f1;
> >                                             }
> >                                         }
> >                                 )
> >                 )
> >                 .keyBy(r->1)
> >                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> >                 .process(
> >                         new ProcessWindowFunction<Tuple2<String,
> > Long>, String, Integer, TimeWindow>() {
> >                             @Override
> >                             public void process(Integer integer,
> > ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> > Collector<String> out) throws Exception {
> >                                 for (Tuple2<String, Long> element :
> > elements) {
> >                                     out.collect("WindowStart:
> > "+context.window().getStart()
> >                                             + "\tExactSize:" +
> > elements.spliterator().getExactSizeIfKnown()+"\t"
> >                                             + element
> >                                     );
> >                                 }
> >                             }
> >                         }
> >                 )
> >                 .print();
> >         env.execute();
> >     }
> > }
> >
>

Reply via email to