Hi Reem

My thinking is that this might be related to recently reported
https://issues.apache.org/jira/browse/FLINK-31632.

Best regards,

Martijn

On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <user@flink.apache.org>
wrote:

> Hey Martijn,
>
> The version is 1.16.0
>
> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Reem,
>>
>> What's the Flink version where you're encountering this issue?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>> user@flink.apache.org> wrote:
>>
>>> Hey there!
>>>
>>> We are seeing a second Flink pipeline encountering similar issues when
>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>> thoughts on there being an incompatibility between the two?
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <reem.ra...@shopify.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> We are integrating the watermark alignment feature into a pipeline with
>>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>> our watermark strategy.
>>>>
>>>>
>>>>     val watermarkStrategy = WatermarkStrategy
>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>       .withTimestampAssigner(new
>>>> TimestampedEventTimestampAssigner[Event])
>>>>       .withWatermarkAlignment("alignment-group-1",
>>>> Duration.ofMinutes(1))
>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>
>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>> records completely from the source. Furthermore, the logs originating from
>>>> from
>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>
>>>> We found that modifying the `updateInterval` passed into the alignment
>>>> parameters seemed to correlate with how long the pipeline would operate
>>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>>> later than an interval of 1 second.
>>>>
>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>> `withWatermarkAlignment`. Might it be related to
>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>> need to have both configured at the same time, but we were also surprised
>>>> by the behaviour of the application. Has anyone run into a similar issue or
>>>> have further insight?
>>>>
>>>> Much Appreciated,
>>>> - Reem
>>>>
>>>>
>>>>
>>>>

Reply via email to