[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752905#comment-17752905 ]
Grzegorz Liter commented on FLINK-32828: ---------------------------------------- [~mason6345] its not about idle partitions, its about a quite opposite situation. During normal operation all partitions are active and more or less aligned with timestamp. Now imagine a situation during startup when one thread starts to consume some partitions faster than others. If it will progress through events from that single partition fast enough to consume events with timestamps that are bigger than some events in other partitions + watermark time, it will start dropping those events as late events. To mitigate this situation Kafka partition aware watermarks were implemented. There is a clear bug here where in case of running fresh job or after having job settled and having traffic on all partition works correctly. That means the watermark emitted by source is a minimal watermark of all partitions. But in in short window just after startup from checkpoint/savepoint watermark incorrectly progresses just based on traffic from single partitions, where it should wait until traffic on all partitions to take a minimal watermark of all partitions. Please not that the logs are from the minimal example I have created (attached to the tickets) and events are send by hand. In production restarting job causes around 1 - 10% of events be dropped at the very start of the job. Even thou in scope of single partition the out of ordness is well below allowed one. > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka > Affects Versions: 1.17.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster > Reporter: Grzegorz Liter > Priority: Major > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from each partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff0000}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit and then should not > progress when receiving events only on single partition. {color} > {color:#172b4d}Actual: As expected{color} > > > {color:#172b4d}This behavior also shows as a burst of late events just after > startup and then no more late events when job operates normally. {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)