Hi, > 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可? 对,具体可以参考下这个内部实现的算子[1]
> 新的sink > v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis() > - context.timestamp()得到sink延迟呢? 应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。 [1] https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314 -- Best! Xuyang 在 2024-02-21 15:17:49,"casel.chen" <casel_c...@126.com> 写道: >感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可? >我看新的sink >v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis() > - context.timestamp()得到sink延迟呢? > > > > > > > > > > > > > > >在 2024-02-21 09:41:37,"Xuyang" <xyzhong...@163.com> 写道: >>Hi, chen. >>可以试一下在sink function的invoke函数中使用: >> >> >> @Override >> public void invoke(RowData row, Context context) throws Exception { >> context.currentProcessingTime(); >> context.currentWatermark(); >> ... >> } >> >> >> >> >> >> >> >>-- >> >> Best! >> Xuyang >> >> >> >> >> >>在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道: >>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. >>> >>>Best, >>>Feng >>> >>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote: >>> >>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >>>> >>>> >>>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements >>>> CheckpointedFunction, CheckpointListener { >>>> >>>> >>>> @Override >>>> public synchronized void invoke(RowData rowData, Context context) >>>> throws IOException { >>>> // 这里想从rowData中获取event time和watermark值,如何实现呢? >>>> } >>>> } >>>> >>>> >>>> 例如source table如下定义 >>>> >>>> >>>> CREATE TEMPORARY TABLE source_table( >>>> username varchar, >>>> click_url varchar, >>>> eventtime varchar, >>>> >>>> ts AS TO_TIMESTAMP(eventtime), >>>> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 >>>> ) with ( >>>> 'connector'='kafka', >>>> ... >>>> >>>> ); >>>> >>>> >>>> CREATE TEMPORARY TABLE sink_table( >>>> username varchar, >>>> click_url varchar, >>>> eventtime varchar >>>> ) with ( >>>> 'connector'='xxx', >>>> ... >>>> ); >>>> insert into sink_table select username,click_url,eventtime from >>>> source_table;