BTW, it seems I spoke too soon in my previous email. I left the job running
overnight with each source having its own alignment group to evaluate only
per-split alignment, and I can see that eventually some partitions never
resumed consumption and the consumer lag increased.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Martjin,
>
> thanks for the pointers. I think the issue I'm seeing is not caused by
> those because in my case the watermarks are not negative. Some more
> information from my setup in case it's relevant:
>
> - All Kafka topics have 6 partitions.
> - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
> parallelism=1.
>
> Regards,
> Alexis.
>
> Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi Alexis,
>>
>> There are a couple of recent Flink tickets on watermark alignment,
>> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
>> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
>> also applicable in your case?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> just for completeness, I don't see the problem if I assign a different
>>> alignment group to each source, i.e. using only split-level watermark
>>> alignment.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui <haishui...@126.com>:
>>>
>>>> Hi,
>>>> I have the same trouble. This is really a bug.
>>>> `shouldWaitForAlignment` needs to be another change.
>>>>
>>>> By the way, a source will be marked as idle, when the source has
>>>> waiting for alignment for a long time. Is this a bug?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" <sarda.espin...@gmail.com>
>>>> 写道:
>>>>
>>>> Hello,
>>>>
>>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>>> strategies are defined like this:
>>>>
>>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>>>             .withIdleness(idleTimeout)
>>>>             .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>>> Duration.ofSeconds(1L))
>>>>
>>>> The max allowed drift is currently 5 seconds, and my sources have an
>>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>>
>>>> What I observe is that, when I restart the job, all sources publish
>>>> messages, but then 2 of them are marked as idle and never resume. I found
>>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>>> negative values:
>>>>
>>>> 2023-06-27 15:11:42,927 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>>> subTaskId=1
>>>> 2023-06-27 15:11:43,009 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,091 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,116 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,298 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>>> 2023-06-27 15:11:43,304 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,306 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,486 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,489 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,492 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>
>>>> Does anyone know if I'm missing something or this is really a bug?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>

Reply via email to