Hi, chen. 
可以试一下在sink function的invoke函数中使用:

    @Override
    public void invoke(RowData row, Context context) throws Exception {
        context.currentProcessingTime(); 
        context.currentWatermark(); 
        ...
    }







--

    Best!
    Xuyang





在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>>     @Override
>>     public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>            //  这里想从rowData中获取event time和watermark值,如何实现呢?
>>     }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>>     CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;

回复