Grzegorz Liter created FLINK-32828:
--------------------------------------
Summary: Kafka-partition-aware watermark not handled correctly
shortly after job start up
Key: FLINK-32828
URL: https://issues.apache.org/jira/browse/FLINK-32828
Project: Flink
Issue Type: Bug
Components: API / DataStream, Connectors / Kafka
Affects Versions: 1.17.1
Environment: Affected environments:
* Local MiniCluster + Confluent Kafka run in docker
** See attached files
* Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka cluster
run in Kubernetes cluster
Reporter: Grzegorz Liter
Attachments: docker-compose.yml, test-job.java
When using KafkaSource with partition aware watermarks. Watermarks are being
emitted even when only one partition has some events just after job startup
from savepoint/checkpoint. After it has some events on other partitions the
watermark behaviour is correct and watermark is emited as a minimum watarmark
from each partition.
Steps to reproduce:
# Setup a Kafka cluster with a topic that has 2 or more partitions.
## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
test-2 --partitions 4}}
# Create a job that (see attached `test-job.java`):
## uses a KafkaSource with
`WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
## has parallelism lower than number of partitions
## stores checkpoint/savepoint
# Start job
# Send events only on single partition
## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic
test-2 --property "parse.key=true" --property "key.separator=:"}}
{{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark
-292275055-05-16T16:47:04.192Z}}
{{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark
-292275055-05-16T16:47:04.192Z}}
{{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark
-292275055-05-16T16:47:04.192Z}}
Expected: Watermark does not progress. Actual: Watermark does not progress.
5. Stop the job
6. Startup job from last checkpoint/savepoint
7. Send events only on single partitions
{{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark
-292275055-05-16T16:47:04.192Z}}
{{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark
2023-08-10T12:53:30.661Z}}
{{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark
2023-08-10T12:53:35.077Z}}
Expected: Watermark does not progress. {color:#FF0000}*Actual: Watermark has
progress*{color}
{color:#172b4d}To add bit more of context:{color}
{color:#172b4d}8. Send events on other partitions and then send events only on
single partitions{color}
{{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2
[] - == Received: test-2/0: 2 -> a, timestamp 2023-08-10T12:54:54.104Z,
watermark 2023-08-10T12:53:38.510Z
14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark
2023-08-10T12:53:38.510Z
14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark
2023-08-10T12:53:38.510Z
14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark
2023-08-10T12:54:44.103Z
14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark
2023-08-10T12:54:44.103Z
14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - ==
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark
2023-08-10T12:54:44.103Z
{color}}}
{color:#172b4d}Expected: Watermark should progress a bit and then should not
progress when receiving events only on single partition. {color}
{color:#172b4d}Actual: As expected{color}
{color:#172b4d}This behavior also shows as a burst of late events just after
startup and then no more late events when job operates normally. {color}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)