Hi, I'm seeing an odd behavior for Kafka source where some records are dropped during recovery.
My test set up is: Kafka source topic -> pass through flink job -> Kafka sink topic There are 10 partitions in the source & sink topics. Test Steps * Start the flink job, send 5 records (first batch) to the source topic, and read the sink. I see all 5 records without issue. * Stop the job with a savepoint * Send another 10 records (second batch) to the source topic * Start the job with the savepoint Expect: read from the beginning of the sink topic, I should see all 15 records from the first and second batches. Actual: Some random records in the second batches are missing. My guess is that the savepoint only contains offsets with partitions that received records from the first batch. Other partitions didn't have a state and by default read from the `latest` offset during recovery. So records from the second batch that fell into the previously empty partitions are never processed. However, based on the source code <https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L579-L582>, I'd expect the partitions without records from the 1st batch to be initialized with `earliest-offset`. But this is not the behavior I saw. What do I miss? I'm using Flink 1.14.3. May I know if there is anything I missed? If not, what's the reason for such behavior? Otherwise, is this a bug? Thanks, Sharon