Hi, 
> 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
对,具体可以参考下这个内部实现的算子[1]


> 新的sink 
> v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
>  - context.timestamp()得到sink延迟呢?
应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。


[1] 
https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314




--

    Best!
    Xuyang





在 2024-02-21 15:17:49,"casel.chen" <casel_c...@126.com> 写道:
>感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
>我看新的sink 
>v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
> - context.timestamp()得到sink延迟呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-02-21 09:41:37,"Xuyang" <xyzhong...@163.com> 写道:
>>Hi, chen. 
>>可以试一下在sink function的invoke函数中使用:
>>
>>
>>    @Override
>>    public void invoke(RowData row, Context context) throws Exception {
>>        context.currentProcessingTime(); 
>>        context.currentWatermark(); 
>>        ...
>>    }
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>    Best!
>>    Xuyang
>>
>>
>>
>>
>>
>>在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道:
>>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>>
>>>Best,
>>>Feng
>>>
>>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote:
>>>
>>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>>
>>>>
>>>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements
>>>> CheckpointedFunction, CheckpointListener {
>>>>
>>>>
>>>>     @Override
>>>>     public synchronized void invoke(RowData rowData, Context context)
>>>> throws IOException {
>>>>            //  这里想从rowData中获取event time和watermark值,如何实现呢?
>>>>     }
>>>> }
>>>>
>>>>
>>>> 例如source table如下定义
>>>>
>>>>
>>>>     CREATE TEMPORARY TABLE source_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar,
>>>>
>>>>   ts AS TO_TIMESTAMP(eventtime),
>>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>>> ) with (
>>>>   'connector'='kafka',
>>>>   ...
>>>>
>>>> );
>>>>
>>>>
>>>> CREATE TEMPORARY TABLE sink_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar
>>>> ) with (
>>>>   'connector'='xxx',
>>>>   ...
>>>> );
>>>> insert into sink_table select username,click_url,eventtime from
>>>> source_table;

回复