当timestamp - offset + windowSize < 0时,元素无法被分配到正确的窗口问题

你好!

我们在学习flink源码时,发现它计算窗口开始时间的算法存在问题。当timestamp - offset + windowSize < 0
时,元素会被错误地分配到比自身时间戳大一个WindowSize的窗口里去。

问题在org.apache.flink.streaming.api.windowing.windows.TimeWindow

    public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

[image: image-20220303144539703.png]

我们认为,这违背了时间和窗口之间的逻辑。也就是*一个元素应该落在开始时间小于自身戳时间戳且结束时间大于自身时间戳的窗口里去*。但是,目前的情况是当timestamp
- offset + windowSize < 0 时,*元素会落在一个未来的时间窗口中*。
解决方法

其实原算法在python中是没问题,导致这个问题的关键在于,编程语言对求余运算的处理。

最后,我们认为应当修改为下述算法。

    public static long getWindowStartWithOffset(long timestamp, long
offset, long windowSize) {
        return timestamp
                - (timestamp - offset) % windowSize
                - (windowSize & (timestamp - offset) >> 63);
    }

(windowSize & (timestamp - offset) >> 63) 这个式子的作用是当(timestamp - offset)<0
时整体的运算结果减去windowSize,否则就什么都不做。这样我们就实现了向负无穷的取整。

这样,我们可以让时间戳为负数时,元素也能被分配到正确的窗口中。[image: image-20220303144558158.png]

这段代码可以通过目前的单元测试。
其他包中的 getWindowStartWithOffset 方法

想到getWindowStartWithOffset中应该有不少地方要用到。我们在项目中搜索了这个方法,发现flink.table
中就处理了负时间戳的问题。

下面是他们的源码。

org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

    private long getWindowStartWithOffset(long timestamp, long offset,
long windowSize) {
        long remainder = (timestamp - offset) % windowSize;
        // handle both positive and negative cases
        if (remainder < 0) {
            return timestamp - (remainder + windowSize);
        } else {
            return timestamp - remainder;
        }
    }

我们可以发起一个pull request吗?

如果社区认为它有修改的必要,希望能把这项任务交给我们。我们的成员都是刚从学校毕业不久的学生,能为flink贡献代码对我们来说是极大的鼓舞。

十分感谢!

这是我们在 jira 上创建的 issue https://issues.apache.org/jira/browse/FLINK-26334

来自 邓子琦 & 林婉妮 & 郭元方

回复