Hi, chen. 可以试一下在sink function的invoke函数中使用:
@Override public void invoke(RowData row, Context context) throws Exception { context.currentProcessingTime(); context.currentWatermark(); ... } -- Best! Xuyang 在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道: >我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. > >Best, >Feng > >On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote: > >> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >> >> >> public class XxxSinkFunction extends RichSinkFunction<RowData> implements >> CheckpointedFunction, CheckpointListener { >> >> >> @Override >> public synchronized void invoke(RowData rowData, Context context) >> throws IOException { >> // 这里想从rowData中获取event time和watermark值,如何实现呢? >> } >> } >> >> >> 例如source table如下定义 >> >> >> CREATE TEMPORARY TABLE source_table( >> username varchar, >> click_url varchar, >> eventtime varchar, >> >> ts AS TO_TIMESTAMP(eventtime), >> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 >> ) with ( >> 'connector'='kafka', >> ... >> >> ); >> >> >> CREATE TEMPORARY TABLE sink_table( >> username varchar, >> click_url varchar, >> eventtime varchar >> ) with ( >> 'connector'='xxx', >> ... >> ); >> insert into sink_table select username,click_url,eventtime from >> source_table;