Hi, Charles,

I am used to read the state in the debug mode. I always set the breakpoint
at the return statemnet in `SavepointReader#read`.
Then I could find the state I need in the field `SavepointMetadataV2
savepointMetadata`.
Finally I could deserialize the state bytes with
`KafkaPartitionSplitSerializer`.

Best,
Hang

Charles Tan <ctangu...@gmail.com> 于2023年5月24日周三 06:27写道:

> Hi everyone,
>
> I have a few questions about reading KafkaSource state using the State
> Processor API. I have a simple Flink application which reads from a Kafka
> topic then produces to a different topic. After running the Flink job and
> stopping it with a savepoint, I then write a few more records to the input
> topic. When the job is resumed from this savepoint, it reads records from
> the position it left off, indicating that the job successfully used the
> savepoint to recover its position. When I inspect the savepoint file with
> the state processor API, I can read the "SourceReaderState" from the
> savepoint. However, the state is read as a Java byte array and I can't
> decode it or make any sense of it. I want to be able to read the savepoint
> state to find out exactly how much progress (partition/offset) a job has
> made in case it fails or is stopped.
>
> Does anyone have any ideas how I can deserialize the bytes from the Kafka
> source state or more generically how to read the Kafka source operator
> state from a savepoint?
>
> Here is the link to a github repository that contains the Flink job that I
> was running, a savepoint file, and the code I was using to try to read the
> savepoint. (https://github.com/charles-tan/flink-state-processor-example)
>
> Thanks,
> Charles
>

Reply via email to