Hey Fanbin, I’m not sure if TimeCharacteristic.IngestionTime works with Flink SQL, but if you haven’t tried setting the stream time characteristic to ingestion time it’s worth a shot. Otherwise, one possibility that comes to mind is to use a custom TimestampAssigner to set the event time to the ingestion time (this could be as simple as just returning System.currentTimeMillis() in extractAscendingTimestamp). Then in your sink you can compare the current processing time to the event’s timestamp and report the latency.
Julian From: Fanbin Bu <fanbin...@coinbase.com> Date: Thursday, December 10, 2020 at 3:41 PM To: user <user@flink.apache.org> Subject: latency monitoring Hi, I would like to monitor the pipeline latency measured by timestamp when writing the output to sink - timestamp when ingested from the source. Now I'm able to get the timestamp writing to sink since the sink is implementing a RichSinkFunction and therefore I can report gauge there [1]. But I have no idea on how to get the source ingestion timestamp since I use Flink SQL create table DDL [2] to create a table and use Flink SQL for the logic. I also checked [3] which is not recommended for prod use cases. Any suggestions? Thanks, Fanbin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#gauge<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.12_ops_metrics.html-23gauge&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=BCOGHlFsHB6hfv3485RaE69UFtn2jpRaaUlUlHL1tb4&s=R9bDQcZ2m2AGn28YEeBuEVkOuurGmLXYnlA0LR-dE5k&e=> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#create-table<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.12_dev_table_sql_create.html-23create-2Dtable&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=BCOGHlFsHB6hfv3485RaE69UFtn2jpRaaUlUlHL1tb4&s=5Ee6eehWTUVhgXIaRua6B6AigY9i5iRCi3jwToDbhMc&e=> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#latency-tracking<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.12_ops_metrics.html-23latency-2Dtracking&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=BCOGHlFsHB6hfv3485RaE69UFtn2jpRaaUlUlHL1tb4&s=2-j-ZdZUTVOKlL7VwWwVuN23nWFoiM_aL9fIChiH8Fw&e=>