Thanks Qingsheng,
I'm using Flink SQL. I'll need to dig a bit deeper to see which Flink
consumer class is actually used.
I didn't find the log you linked. However I found the following log from
the task manager. Just a note, in this test, I sent 10 records to the
source topic which has 10 partitions:
{"@timestamp":"2022-03-18T03:54:24.577Z","@version":"1","message":"Adding
split(s) to reader: [[Partition: stream-00000000-67362410-8,
StartingOffset: -1, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-9, StartingOffset: 1, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-6,
StartingOffset: 3, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-7, StartingOffset: 3, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-4,
StartingOffset: 2, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-5, StartingOffset: 2, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-2,
StartingOffset: 3, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-3, StartingOffset: 2, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-0,
StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-1, StartingOffset: 3, StoppingOffset:
-9223372036854775808]]","logger_name":"org.apache.flink.connector.base.source.reader.SourceReaderBase","thread_name":"Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.073Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
LATEST offset of partition
stream-00000000-67362410-8","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.075Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-7","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 1 for partition
stream-00000000-67362410-9","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 1 for partition
stream-00000000-67362410-0","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-2","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-1","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.079Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-4","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.080Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-3","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.082Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-6","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.083Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-5","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
As you can see, the `stream-00000000-67362410-8` partition had
StartingOffset -1 and started from the LATEST offset during restore which
confirmed what I describled.
On Fri, Mar 18, 2022 at 2:13 AM Qingsheng Ren <[email protected]> wrote:
> Hi Sharon,
>
> Could you check the log after starting the job with savepoint? If you have
> INFO log enabled you will get an entry “Consumer subtask {} will start
> reading {} partitions with offsets in restored state: {}” [1] in the log,
> which shows the starting offset of partitions. This might be helpful to
> reveal the problem.
>
> BTW FlinkKafkaConsumer has been marked as deprecated since 1.14. Please
> consider switching to the new KafkaSource if you are developing new
> applications.
>
> [1]
> https://github.com/apache/flink/blob/a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L614-L618
>
> Best regards,
>
> Qingsheng
>
> > On Mar 18, 2022, at 13:28, Sharon Xie <[email protected]> wrote:
> >
> > Hi,
> >
> > I'm seeing an odd behavior for Kafka source where some records are
> dropped during recovery.
> >
> > My test set up is: Kafka source topic -> pass through flink job -> Kafka
> sink topic
> > There are 10 partitions in the source & sink topics.
> >
> > Test Steps
> > * Start the flink job, send 5 records (first batch) to the source topic,
> and read the sink. I see all 5 records without issue.
> > * Stop the job with a savepoint
> > * Send another 10 records (second batch) to the source topic
> > * Start the job with the savepoint
> >
> > Expect: read from the beginning of the sink topic, I should see all 15
> records from the first and second batches.
> > Actual: Some random records in the second batches are missing.
> >
> > My guess is that the savepoint only contains offsets with partitions
> that received records from the first batch. Other partitions didn't have a
> state and by default read from the `latest` offset during recovery. So
> records from the second batch that fell into the previously empty
> partitions are never processed.
> >
> > However, based on the source code, I'd expect the partitions without
> records from the 1st batch to be initialized with `earliest-offset`. But
> this is not the behavior I saw. What do I miss?
> >
> > I'm using Flink 1.14.3. May I know if there is anything I missed? If
> not, what's the reason for such behavior? Otherwise, is this a bug?
> >
> >
> >
> > Thanks,
> > Sharon
>
>