I'm following the example from this section:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector

On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:

> Other points
> - I'm using the kafka timestamp as event time.
> - The same issue happens even if I use an idle watermark.
>
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> There are 12 Kafka partitions (to keep the structure similar to other low
>> traffic environments).
>>
>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> I'm running a backfill from a kafka topic with very few records spread
>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>> source have a watermark that's more recent (by hours) than the event time.
>>> I haven't seen this before when running this.  This violates what I'd
>>> assume the kafka source would do.
>>>
>>> Example problem:
>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
>>> are separated by a longer time period.
>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>    context.timestamp() = 1000
>>>    context.timerService().currentWatermark() = 500000
>>>
>>> Details about how I'm running this:
>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>>> - I'm using FlinkKafkaConsumer
>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
>>> settings.
>>> - I'm running similar code in all the environments.  The main difference
>>> is low traffic.  I have not been able to reproduce this out of the
>>> environment.
>>>
>>>
>>> I put the following process function right after my kafka source.
>>>
>>> --------
>>>
>>> AfterSource
>>> ts=1647274892728
>>> watermark=1647575140007
>>> record=...
>>>
>>>
>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>     private final String label;
>>>     public TextLogDeliveryLog(String label) {
>>>         this.label = label;
>>>     }
>>>     @Override
>>>     public void processElement(Record record, Context context,
>>> Collector<Record> collector) throws Exception {
>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>                 label, context.timestamp(),
>>> context.timerService().currentWatermark(), record);
>>>         collector.collect(deliveryLog);
>>>     }
>>> }
>>>
>>

Reply via email to