Hi Frank,

I'm not sure exactly what you are trying to accomplish, but yes. In
the TimestampAssigner you can only return what should be the new timestamp
for the given record.

If you want to use "ingestion time" - "true even time"  as some kind of
delay metric, you will indeed need to have both of them calculated
somewhere. You could:
1. As you described, use first ingestion time assigner, a mapper function
to extract this to a separate field, re assign the true event time, and
calculate the delay
2. Or you could simply assign the correct event time and in a simple single
mapper, chained directly to the source, use for example
`System.currentTimeMillis() - eventTime` to calculate this delay in a
single step. After all, that's more or less what Flink is doing to
calculate the ingestion time [1]

Best, Piotrek

[1]
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/eventtime/IngestionTimeAssigner.java

śr., 16 lut 2022 o 09:46 Frank Dekervel <fr...@kapernikov.com> napisał(a):

> Hello,
>
> I'm getting messages from a kafka stream. The messages are JSON records
> with a "timestamp" key in the json. This timestamp key contains the time
> at which the message was generated. Now i'd like if these messages had a
> delivery delay (eg delay between message generation and arrival in
> kafka). So i don't want to have the "full" delay (eg difference between
> generation time and processing time), just de delivery delay.
>
> In my timestamp assigner i get a "long" with the original timestamp as
> an argument, but i cannot yield an updated record from the timestamp
> assigner (eg with an extra field "deliveryDelay" or so).
>
> So i guess my only option is to not specify the timestamp/watermark
> extractor in the env.fromSource, then first mapping the stream to add a
> lateness field and only after that reassign timestamps/watermarks ... is
> that right ?
>
> Thanks!
>
> Greetings,
> Frank
>
>
>
>

Reply via email to