[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752901#comment-17752901 ]
Mason Chen edited comment on FLINK-32828 at 8/10/23 6:45 PM: ------------------------------------------------------------- >From the logs, it looks like you have a key by and watermark is progressing >because that one active partition is moving data to all other operators. I >would start by setting and tuning the idleness >([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources)|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources] > to account for the idle partitions was (Author: mason6345): >From the logs, it looks like you have a key by and watermark is progressing >because that one active partition is moving data to all other operators. I >would start by setting and tuning the >[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources] > to account for the idle partitions > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka > Affects Versions: 1.17.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster > Reporter: Grzegorz Liter > Priority: Major > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from each partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff0000}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit and then should not > progress when receiving events only on single partition. {color} > {color:#172b4d}Actual: As expected{color} > > > {color:#172b4d}This behavior also shows as a burst of late events just after > startup and then no more late events when job operates normally. {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)