Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 yidan zhao
嗯,问题的确存在。  只是场景有点特别,ts一般是时间戳,ts本身负数或很小的情况这个我没考虑。

邓子琦  于2022年3月4日周五 14:28写道:

> 不是的,这是一个现存问题
>
> abs(offset)>size的约束并不能让ts-offset+size必然大于0
>
> 图中给出的示例是我们用代码验证过的。你可以尝试运行下面的代码
>
> 会发现它暴露出来的问题跟我所描述的一样
>
> /* output
>
> *窗口开始时间是-15000  有1个元素   数据是 (a,-17000)
>
> *窗口开始时间是-1  有1个元素   数据是 (b,-12000)
>
> *窗口开始时间是-5000   有2个元素   数据是 (c,-7000)
>
> *窗口开始时间是-5000   有2个元素   数据是 (d,-2000)
>
> *窗口开始时间是0   有1个元素   数据是 (e,3000)
>
> *窗口开始时间是5000有1个元素   数据是 (f,8000)
>
> *窗口开始时间是1   有1个元素   数据是 (g,13000)
>
> *窗口开始时间是15000   有1个元素   数据是 (h,18000)
>
> */
>
> public class Example {
>
> public static void main(String[] args) throws Exception {
>
> final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
>
> TimeZone.setDefault(timeZone);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env
>
> .setParallelism(1)
>
> .fromElements(
>
> Tuple2.of("a",-17*1000L),
>
> Tuple2.of("b",-12*1000L),
>
> Tuple2.of("c",-7*1000L),
>
> Tuple2.of("d",-2*1000L),
>
> Tuple2.of("e",3*1000L),
>
> Tuple2.of("f",8*1000L),
>
> Tuple2.of("g",13*1000L),
>
> Tuple2.of("h",18*1000L)
>
> )
>
> .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.>forMonotonousTimestamps()
>
> .withTimestampAssigner(
>
> new
> SerializableTimestampAssigner>() {
>
> @Override
>
> public long
> extractTimestamp(Tuple2 element, long l) {
>
> return element.f1;
>
> }
>
> }
>
> )
>
> )
>
> .keyBy(r->1)
>
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>
> .process(
>
> new ProcessWindowFunction Long>, String, Integer, TimeWindow>() {
>
> @Override
>
> public void process(Integer integer,
> ProcessWindowFunction, String, Integer,
> TimeWindow>.Context context, Iterable> elements,
> Collector out) throws Exception {
>
> for (Tuple2 element :
> elements) {
>
>
> out.collect("窗口开始时间是\t"+context.window().getStart()
>
> + "\t有" +
> elements.spliterator().getExactSizeIfKnown()+"个元素"
>
> + "\t\t数据是\t"+ element
>
> );
>
> }
>
> }
>
> }
>
> )
>
> .print();
>
> env.execute();
>
> }
>
> }
>
>
> yidan zhao  于2022年3月4日周五 10:30写道:
>
> > 1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
> > 为例,分配window的时候的逻辑为:
> >
> > long start =
> > TimeWindow.getWindowStartWithOffset(
> > timestamp, (globalOffset + staggerOffset) % size, size);
> >
> >
> > 构造函数中offset逻辑:
> >
> > protected TumblingEventTimeWindows(long size, long offset,
> > WindowStagger windowStagger) {
> > if (Math.abs(offset) >= size) {
> > throw new IllegalArgumentException(
> > "TumblingEventTimeWindows parameters must satisfy
> > abs(offset) < size");
> > }
> >
> > this.size = size;
> > this.globalOffset = offset;
> > this.windowStagger = windowStagger;
> > }
> >
> >
> > 之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
> > ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。
> >
> > 邓子琦  于2022年3月3日周四 19:49写道:
> >
> > > 好滴 谢谢
> > >
> > > yu'an huang  于2022年3月3日周四 18:17写道:
> > >
> > > >
> > > > 我想你们可以为Flink贡献代码。只要按照guide
> > > > https://flink.apache.org/contributing/contribute-code.html <
> > > >
> > >
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > > > >
> > > >
> > > > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> > > >
> > > >
> > > >
> > > > > On 3 Mar 2022, at 5:33 PM, 邓子琦  wrote:
> > > > >
> > > > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > > > >
> > > > > 问题
> > > > >
> > > > > 你好!
> > > > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize
> < 0
> > > > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > > > >public static long getWindowStartWithOffset(long timestamp, long
> > > > offset, long windowSize) {
> > > > >return timestamp - (timestamp - offset + windowSize) %
> > > windowSize;
> 

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 邓子琦
不是的,这是一个现存问题

abs(offset)>size的约束并不能让ts-offset+size必然大于0

图中给出的示例是我们用代码验证过的。你可以尝试运行下面的代码

会发现它暴露出来的问题跟我所描述的一样

/* output

*窗口开始时间是-15000  有1个元素   数据是 (a,-17000)

*窗口开始时间是-1  有1个元素   数据是 (b,-12000)

*窗口开始时间是-5000   有2个元素   数据是 (c,-7000)

*窗口开始时间是-5000   有2个元素   数据是 (d,-2000)

*窗口开始时间是0   有1个元素   数据是 (e,3000)

*窗口开始时间是5000有1个元素   数据是 (f,8000)

*窗口开始时间是1   有1个元素   数据是 (g,13000)

*窗口开始时间是15000   有1个元素   数据是 (h,18000)

*/

public class Example {

public static void main(String[] args) throws Exception {

final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");

TimeZone.setDefault(timeZone);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env

.setParallelism(1)

.fromElements(

Tuple2.of("a",-17*1000L),

Tuple2.of("b",-12*1000L),

Tuple2.of("c",-7*1000L),

Tuple2.of("d",-2*1000L),

Tuple2.of("e",3*1000L),

Tuple2.of("f",8*1000L),

Tuple2.of("g",13*1000L),

Tuple2.of("h",18*1000L)

)

.assignTimestampsAndWatermarks(


WatermarkStrategy.>forMonotonousTimestamps()

.withTimestampAssigner(

new
SerializableTimestampAssigner>() {

@Override

public long
extractTimestamp(Tuple2 element, long l) {

return element.f1;

}

}

)

)

.keyBy(r->1)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.process(

new ProcessWindowFunction, String, Integer, TimeWindow>() {

@Override

public void process(Integer integer,
ProcessWindowFunction, String, Integer,
TimeWindow>.Context context, Iterable> elements,
Collector out) throws Exception {

for (Tuple2 element : elements) {


out.collect("窗口开始时间是\t"+context.window().getStart()

+ "\t有" +
elements.spliterator().getExactSizeIfKnown()+"个元素"

+ "\t\t数据是\t"+ element

);

}

}

}

)

.print();

env.execute();

}

}


yidan zhao  于2022年3月4日周五 10:30写道:

> 1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
> 为例,分配window的时候的逻辑为:
>
> long start =
> TimeWindow.getWindowStartWithOffset(
> timestamp, (globalOffset + staggerOffset) % size, size);
>
>
> 构造函数中offset逻辑:
>
> protected TumblingEventTimeWindows(long size, long offset,
> WindowStagger windowStagger) {
> if (Math.abs(offset) >= size) {
> throw new IllegalArgumentException(
> "TumblingEventTimeWindows parameters must satisfy
> abs(offset) < size");
> }
>
> this.size = size;
> this.globalOffset = offset;
> this.windowStagger = windowStagger;
> }
>
>
> 之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
> ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。
>
> 邓子琦  于2022年3月3日周四 19:49写道:
>
> > 好滴 谢谢
> >
> > yu'an huang  于2022年3月3日周四 18:17写道:
> >
> > >
> > > 我想你们可以为Flink贡献代码。只要按照guide
> > > https://flink.apache.org/contributing/contribute-code.html <
> > >
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > > >
> > >
> > > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> > >
> > >
> > >
> > > > On 3 Mar 2022, at 5:33 PM, 邓子琦  wrote:
> > > >
> > > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > > >
> > > > 问题
> > > >
> > > > 你好!
> > > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> > > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > > >public static long getWindowStartWithOffset(long timestamp, long
> > > offset, long windowSize) {
> > > >return timestamp - (timestamp - offset + windowSize) %
> > windowSize;
> > > >   }
> > > >
> > > >
> > >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> > > - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > > > 解决方法
> > > >
> > > > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > > > 最后,我们认为应当修改为下述算法。
> > > >public static long getWindowStartWithOffset(long timestamp, long
> > > offset, long windowSize) {
> > > >return timestamp

CDC 分表同步快照顺序

2022-03-03 文章 刘 家锹
Hi, all

我们在使用Flink CDC同步多张表,然后合并slink到一张es表中。但表之间有数据流转关系,比如有table_1, table_2, table2,  
一条数据A之前table_1,但后续可能更新到table_2。
想请教下,如果使用正则表达式匹配同步分表,是否可以保证数据有序无误呢? 也就是全部分表同时快照,且等待所有分表快照同步完后才开始处理binlog。
从文档[1]中看到对于单表这种模式是可以保证的,但不确定多表且有数据流转是否也一样。



[1] 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
MySQL CDC Connector — Flink CDC 
documentation
SQL Client JAR¶. Download link is available only for stable releases.. Download 
flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar and put it under 
/lib/.. Setup MySQL server¶. You have to define a MySQL user with 
appropriate permissions on all databases that the Debezium MySQL connector 
monitors.
ververica.github.io



Re: flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 yu'an huang
你好,我检查了下关于checkpoint的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
 

tolerable checkpoint failure number: This defines how many consecutive 
checkpoint failures will be tolerated, before the whole job is failed over. The 
default value is 0, which means no checkpoint failures will be tolerated, and 
the job will fail on first reported checkpoint failure.

可以为作业设置容忍checkpoint失败的, 你可以像文档中说加下相关设置:
// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
希望可以帮到你




> On 4 Mar 2022, at 10:07 AM, yu'an huang  > wrote:
> 
> 你好,checkpoint超时默认不会导致作业重启,可以提供下JM log看看作业为什么会重启吗?
> 
>> On 3 Mar 2022, at 9:15 PM, kong <62...@163.com > wrote:
>> 
>> hello,我最近遇到一个问题:
>> 我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> 
>> Sink
>> 在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
>> 最后会产生Checkpoint expired before 
>> completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
>> 
>> 
>> 不知道有什么好办法解决该问题。
>> 多谢~
>> 
> 



Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 yidan zhao
1 在flink中调用这个方法的部分是 windowAssigner,以TumblingEventTimeWindows
为例,分配window的时候的逻辑为:

long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);


构造函数中offset逻辑:

protected TumblingEventTimeWindows(long size, long offset,
WindowStagger windowStagger) {
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException(
"TumblingEventTimeWindows parameters must satisfy
abs(offset) < size");
}

this.size = size;
this.globalOffset = offset;
this.windowStagger = windowStagger;
}


之前的版本中没有windowStagger的逻辑,只考虑offst,构造函数中对abs(offset)>size做了限制。因此
ts-offset+size 是 >0的。当然新引入的windowStagger的确可能引入这个问题。

邓子琦  于2022年3月3日周四 19:49写道:

> 好滴 谢谢
>
> yu'an huang  于2022年3月3日周四 18:17写道:
>
> >
> > 我想你们可以为Flink贡献代码。只要按照guide
> > https://flink.apache.org/contributing/contribute-code.html <
> >
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> > >
> >
> > 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
> >
> >
> >
> > > On 3 Mar 2022, at 5:33 PM, 邓子琦  wrote:
> > >
> > > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> > >
> > > 问题
> > >
> > > 你好!
> > > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> > 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> > >public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> > >return timestamp - (timestamp - offset + windowSize) %
> windowSize;
> > >   }
> > >
> > >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> > - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > > 解决方法
> > >
> > > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > > 最后,我们认为应当修改为下述算法。
> > >public static long getWindowStartWithOffset(long timestamp, long
> > offset, long windowSize) {
> > >return timestamp
> > >- (timestamp - offset) % windowSize
> > >- (windowSize & (timestamp - offset) >> 63);
> > >   }
> > > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> > offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > > 这段代码可以通过目前的单元测试。
> > > 其他包中的 getWindowStartWithOffset 方法
> > >
> > >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > > 下面是他们的源码。
> > >
> org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> > >private long getWindowStartWithOffset(long timestamp, long offset,
> > long windowSize) {
> > >long remainder = (timestamp - offset) % windowSize;
> > >// handle both positive and negative cases
> > >if (remainder < 0) {
> > >return timestamp - (remainder + windowSize);
> > >   } else {
> > >return timestamp - remainder;
> > >   }
> > >   }
> > > 我们可以发起一个pull request吗?
> > >
> > > 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > > 十分感谢!
> > > 这是我们在 jira 上创建的 issue
> https://issues.apache.org/jira/browse/FLINK-26334
> > 
> > > 来自 邓子琦 & 林婉妮 & 郭元方
> > >
> >
> >
>


Re: flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 yu'an huang
你好,checkpoint超时默认不会导致作业重启,可以提供下JM log看看作业为什么会重启吗?

> On 3 Mar 2022, at 9:15 PM, kong <62...@163.com> wrote:
> 
> hello,我最近遇到一个问题:
> 我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> 
> Sink
> 在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
> 最后会产生Checkpoint expired before 
> completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
> 
> 
> 不知道有什么好办法解决该问题。
> 多谢~
> 



Flink??????????????????,????????????

2022-03-03 文章 Tony
cpu,??Flink???  32, 
512G, Flink???

flink 反压导致checkpoint超时,从而导致任务失败问题

2022-03-03 文章 kong
hello,我最近遇到一个问题:
我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> Sink
在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
最后会产生Checkpoint expired before 
completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。


不知道有什么好办法解决该问题。
多谢~



??????????????????kafka??????????????kafka??????????????????????????

2022-03-03 文章 jianjianjianjianjianjianjianjian

  
kafka??kafkakafkaDebug
  ??AbstractFetcher.java 
??emitRecordsWithTimestamps 
??for??
  ?? AbstractFetcher 
emitRecordsWithTimestamps??
protected void emitRecordsWithTimestamps(
Queue

Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 邓子琦
好滴 谢谢

yu'an huang  于2022年3月3日周四 18:17写道:

>
> 我想你们可以为Flink贡献代码。只要按照guide
> https://flink.apache.org/contributing/contribute-code.html <
> https://flink.apache.org/contributing/contribute-code.html%E4%B8%AD%E7%9A%84%E6%AD%A5%E9%AA%A4%E5%85%88%E5%BB%BA%E7%AB%8BJIRA
> >
>
> 建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。
>
>
>
> > On 3 Mar 2022, at 5:33 PM, 邓子琦  wrote:
> >
> > 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> >
> > 问题
> >
> > 你好!
> > 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
> 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> > 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
> >public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
> >return timestamp - (timestamp - offset + windowSize) % windowSize;
> >   }
> >
> >
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
> - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> > 解决方法
> >
> > 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> > 最后,我们认为应当修改为下述算法。
> >public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
> >return timestamp
> >- (timestamp - offset) % windowSize
> >- (windowSize & (timestamp - offset) >> 63);
> >   }
> > (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp -
> offset)<0 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> > 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> > 这段代码可以通过目前的单元测试。
> > 其他包中的 getWindowStartWithOffset 方法
> >
> >
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> > 下面是他们的源码。
> > org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
> >private long getWindowStartWithOffset(long timestamp, long offset,
> long windowSize) {
> >long remainder = (timestamp - offset) % windowSize;
> >// handle both positive and negative cases
> >if (remainder < 0) {
> >return timestamp - (remainder + windowSize);
> >   } else {
> >return timestamp - remainder;
> >   }
> >   }
> > 我们可以发起一个pull request吗?
> >
> > 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> > 十分感谢!
> > 这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334
> 
> > 来自 邓子琦 & 林婉妮 & 郭元方
> >
>
>


Re: [DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 yu'an huang

我想你们可以为Flink贡献代码。只要按照guide 
https://flink.apache.org/contributing/contribute-code.html 


建立JIRA Ticket然后讨论就可以了,为了社区可以更方便的review你们的代码。



> On 3 Mar 2022, at 5:33 PM, 邓子琦  wrote:
> 
> 当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口
> 
> 问题
> 
> 你好!
> 我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0 
> 时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。
> 问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow
>public static long getWindowStartWithOffset(long timestamp, long offset, 
> long windowSize) {
>return timestamp - (timestamp - offset + windowSize) % windowSize;
>   }
> 
> 我们认为,这违背了时间和窗口之间的逻辑。也就是一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去。但是,目前的情况是当timestamp
>  - offset + windowSize < 0  时,元素会落在一个未来的时间窗口中。
> 解决方法
> 
> 其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。
> 最后,我们认为应当修改为下述算法。
>public static long getWindowStartWithOffset(long timestamp, long offset, 
> long windowSize) {
>return timestamp
>- (timestamp - offset) % windowSize
>- (windowSize & (timestamp - offset) >> 63);
>   }
> (windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp - offset)<0 
> 时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。
> 这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。
> 这段代码可以通过目前的单元测试。
> 其他包中的 getWindowStartWithOffset 方法
> 
> 想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table中就处理了负时间戳的问题。
> 下面是他们的源码。
> org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping
>private long getWindowStartWithOffset(long timestamp, long offset, long 
> windowSize) {
>long remainder = (timestamp - offset) % windowSize;
>// handle both positive and negative cases
>if (remainder < 0) {
>return timestamp - (remainder + windowSize);
>   } else {
>return timestamp - remainder;
>   }
>   }
> 我们可以发起一个pull request吗?
> 
> 如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。
> 十分感谢!
> 这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334 
> 
> 来自 邓子琦 & 林婉妮 & 郭元方
> 



[DISCUSS]当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口

2022-03-03 文章 邓子琦
当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口问题

你好!

我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。

问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow

public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}

[image: image-20220303144539703.png]

我们认为,这违背了时间和窗口之间的逻辑。也就是*一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去*。但是,目前的情况是当timestamp
- offset + windowSize < 0 时,*元素会落在一个未来的时间窗口中*。
解决方法

其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。

最后,我们认为应当修改为下述算法。

public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
return timestamp
- (timestamp - offset) % windowSize
- (windowSize & (timestamp - offset) >> 63);
}

(windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp - offset)<0
时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。

这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。[image: image-20220303144558158.png]

这段代码可以通过目前的单元测试。
其他包中的 getWindowStartWithOffset 方法

想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table
中就处理了负时间戳的问题。

下面是他们的源码。

org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

private long getWindowStartWithOffset(long timestamp, long offset,
long windowSize) {
long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}

我们可以发起一个pull request吗?

如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。

十分感谢!

这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334

来自 邓子琦 & 林婉妮 & 郭元方