BTW, it seems I spoke too soon in my previous email. I left the job running overnight with each source having its own alignment group to evaluate only per-split alignment, and I can see that eventually some partitions never resumed consumption and the consumer lag increased.
Regards, Alexis. Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa < sarda.espin...@gmail.com>: > Hi Martjin, > > thanks for the pointers. I think the issue I'm seeing is not caused by > those because in my case the watermarks are not negative. Some more > information from my setup in case it's relevant: > > - All Kafka topics have 6 partitions. > - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to > parallelism=1. > > Regards, > Alexis. > > Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser < > martijnvis...@apache.org>: > >> Hi Alexis, >> >> There are a couple of recent Flink tickets on watermark alignment, >> specifically https://issues.apache.org/jira/browse/FLINK-32414 and >> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be >> also applicable in your case? >> >> Best regards, >> >> Martijn >> >> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa < >> sarda.espin...@gmail.com> wrote: >> >>> Hello, >>> >>> just for completeness, I don't see the problem if I assign a different >>> alignment group to each source, i.e. using only split-level watermark >>> alignment. >>> >>> Regards, >>> Alexis. >>> >>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui <haishui...@126.com>: >>> >>>> Hi, >>>> I have the same trouble. This is really a bug. >>>> `shouldWaitForAlignment` needs to be another change. >>>> >>>> By the way, a source will be marked as idle, when the source has >>>> waiting for alignment for a long time. Is this a bug? >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" <sarda.espin...@gmail.com> >>>> 写道: >>>> >>>> Hello, >>>> >>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and >>>> the externalized Kafka connector. My job has 3 sources whose watermark >>>> strategies are defined like this: >>>> >>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift) >>>> .withIdleness(idleTimeout) >>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift, >>>> Duration.ofSeconds(1L)) >>>> >>>> The max allowed drift is currently 5 seconds, and my sources have an >>>> idleTimeout of 1, 1.5, and 5 seconds. >>>> >>>> What I observe is that, when I restart the job, all sources publish >>>> messages, but then 2 of them are marked as idle and never resume. I found >>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be >>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show >>>> negative values: >>>> >>>> 2023-06-27 15:11:42,927 DEBUG >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from >>>> subTaskId=1 >>>> 2023-06-27 15:11:43,009 DEBUG >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >>>> 07:12:55.807) from subTaskId=0 >>>> 2023-06-27 15:11:43,091 DEBUG >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >>>> 07:12:55.807) from subTaskId=0 >>>> 2023-06-27 15:11:43,116 DEBUG >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >>>> 07:12:55.807) from subTaskId=0 >>>> 2023-06-27 15:11:43,298 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1] >>>> 2023-06-27 15:11:43,304 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0] >>>> 2023-06-27 15:11:43,306 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0] >>>> 2023-06-27 15:11:43,486 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >>>> 2023-06-27 15:11:43,489 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >>>> 2023-06-27 15:11:43,492 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >>>> >>>> Does anyone know if I'm missing something or this is really a bug? >>>> >>>> Regards, >>>> Alexis. >>>> >>>>