Hi,

I'm seeing an odd behavior for Kafka source where some records are dropped
during recovery.

My test set up is: Kafka source topic -> pass through flink job -> Kafka
sink topic
There are 10 partitions in the source & sink topics.

Test Steps
* Start the flink job, send 5 records (first batch) to the source topic,
and read the sink. I see all 5 records without issue.
* Stop the job with a savepoint
* Send another 10 records (second batch) to the source topic
* Start the job with the savepoint

Expect: read from the beginning of the sink topic, I should see all 15
records from the first and second batches.
Actual: Some random records in the second batches are missing.

My guess is that the savepoint only contains offsets with partitions that
received records from the first batch. Other partitions didn't have a state
and by default read from the `latest` offset during recovery. So records
from the second batch that fell into the previously empty partitions are
never processed.

However, based on the source code
<https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L579-L582>,
I'd expect the partitions without records from the 1st batch to be
initialized with `earliest-offset`. But this is not the behavior I saw.
What do I miss?

I'm using Flink 1.14.3. May I know  if there is anything I missed? If not,
what's the reason for such behavior? Otherwise, is this a bug?



Thanks,
Sharon

Reply via email to