flink sql作业如何统计端到端延迟

2024-02-20 文章 casel.chen
flink sql作业从kafka消费mysql过来的canal 
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - 
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang"  写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>@Override
>public void invoke(RowData row, Context context) throws Exception {
>context.currentProcessingTime(); 
>context.currentWatermark(); 
>...
>}
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>> @Override
>>> public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>> }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>> CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;


Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Xuyang
Hi, chen. 
可以试一下在sink function的invoke函数中使用:


@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime(); 
context.currentWatermark(); 
...
}







--

Best!
Xuyang





在 2024-02-20 19:38:44,"Feng Jin"  写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;


Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Feng Jin
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.

Best,
Feng

On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:

> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>
>
> public class XxxSinkFunction extends RichSinkFunction implements
> CheckpointedFunction, CheckpointListener {
>
>
> @Override
> public synchronized void invoke(RowData rowData, Context context)
> throws IOException {
>//  这里想从rowData中获取event time和watermark值,如何实现呢?
> }
> }
>
>
> 例如source table如下定义
>
>
> CREATE TEMPORARY TABLE source_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar,
>
>   ts AS TO_TIMESTAMP(eventtime),
>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
> ) with (
>   'connector'='kafka',
>   ...
>
> );
>
>
> CREATE TEMPORARY TABLE sink_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar
> ) with (
>   'connector'='xxx',
>   ...
> );
> insert into sink_table select username,click_url,eventtime from
> source_table;


退订

2024-02-20 文章 任香帅
退订

flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


public class XxxSinkFunction extends RichSinkFunction implements 
CheckpointedFunction, CheckpointListener {


@Override
public synchronized void invoke(RowData rowData, Context context) throws 
IOException {
   //  这里想从rowData中获取event time和watermark值,如何实现呢?
}
}


例如source table如下定义


CREATE TEMPORARY TABLE source_table(
  username varchar,
  click_url varchar,
  eventtime varchar,
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
) with (
  'connector'='kafka',
  ...

);


CREATE TEMPORARY TABLE sink_table(
  username varchar,
  click_url varchar,
  eventtime varchar
) with (
  'connector'='xxx',
  ...
);
insert into sink_table select username,click_url,eventtime from source_table;