感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang" <xyzhong...@163.com> 写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>    @Override
>    public void invoke(RowData row, Context context) throws Exception {
>        context.currentProcessingTime(); 
>        context.currentWatermark(); 
>        ...
>    }
>
>
>
>
>
>
>
>--
>
>    Best!
>    Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>>     @Override
>>>     public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>            //  这里想从rowData中获取event time和watermark值,如何实现呢?
>>>     }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>>     CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;

回复