This sounds very much like our issue, thank you! We will follow along with
the bug.

Much appreciated,
- Reem

On Thu, Mar 30, 2023 at 9:20 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Reem
>
> My thinking is that this might be related to recently reported
> https://issues.apache.org/jira/browse/FLINK-31632.
>
> Best regards,
>
> Martijn
>
> On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <user@flink.apache.org>
> wrote:
>
>> Hey Martijn,
>>
>> The version is 1.16.0
>>
>> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi Reem,
>>>
>>> What's the Flink version where you're encountering this issue?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hey there!
>>>>
>>>> We are seeing a second Flink pipeline encountering similar issues when
>>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>>> thoughts on there being an incompatibility between the two?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <reem.ra...@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> We are integrating the watermark alignment feature into a pipeline
>>>>> with a Kafka source during a "backfill"- i.e. playing from an earlier 
>>>>> Kafka
>>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>>> our watermark strategy.
>>>>>
>>>>>
>>>>>     val watermarkStrategy = WatermarkStrategy
>>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>>       .withTimestampAssigner(new
>>>>> TimestampedEventTimestampAssigner[Event])
>>>>>       .withWatermarkAlignment("alignment-group-1",
>>>>> Duration.ofMinutes(1))
>>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>>
>>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>>> records completely from the source. Furthermore, the logs originating from
>>>>> from
>>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>>
>>>>> We found that modifying the `updateInterval` passed into the alignment
>>>>> parameters seemed to correlate with how long the pipeline would operate
>>>>> before stopping - a larger interval of 20 minutes would encounter the 
>>>>> issue
>>>>> later than an interval of 1 second.
>>>>>
>>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>>> `withWatermarkAlignment`. Might it be related to
>>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>>> need to have both configured at the same time, but we were also surprised
>>>>> by the behaviour of the application. Has anyone run into a similar issue 
>>>>> or
>>>>> have further insight?
>>>>>
>>>>> Much Appreciated,
>>>>> - Reem
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to