Hi, 我看你使用了System.currentTimeMillis(),有可能是分布式的情况下,多台TM上的机器时间不一致导致的吗?
-- Best! Xuyang 在 2024-04-20 19:04:14,"hhq" <424028...@qq.com.INVALID> 写道: >我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助 > >public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource<Integer> integerDataStreamSource = env.addSource(new > IntegerSource()); > > integerDataStreamSource > .keyBy(Integer::intValue) > .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) > .process(new IntegerProcessFunction()) > .setParallelism(1); > > env.execute(); >} > > >public class IntegerProcessFunction extends ProcessWindowFunction<Integer, >Object, Integer, TimeWindow> { > private Logger log; > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > this.log = Logger.getLogger(IntegerProcessFunction.class); > } > > @Override > public void process(Integer integer, ProcessWindowFunction<Integer, > Object, Integer, TimeWindow>.Context context, Iterable<Integer> elements, > Collector<Object> out) throws Exception { > long currentTimeMillis = System.currentTimeMillis(); > long end = context.window().getEnd(); > > if (currentTimeMillis < end) { > log <http://log.info/>.info <http://log.info/>("bad"); > } else { > log <http://log.info/>.info <http://log.info/>("good"); > } > } >} >