Hi, Charles, I am used to read the state in the debug mode. I always set the breakpoint at the return statemnet in `SavepointReader#read`. Then I could find the state I need in the field `SavepointMetadataV2 savepointMetadata`. Finally I could deserialize the state bytes with `KafkaPartitionSplitSerializer`.
Best, Hang Charles Tan <ctangu...@gmail.com> 于2023年5月24日周三 06:27写道: > Hi everyone, > > I have a few questions about reading KafkaSource state using the State > Processor API. I have a simple Flink application which reads from a Kafka > topic then produces to a different topic. After running the Flink job and > stopping it with a savepoint, I then write a few more records to the input > topic. When the job is resumed from this savepoint, it reads records from > the position it left off, indicating that the job successfully used the > savepoint to recover its position. When I inspect the savepoint file with > the state processor API, I can read the "SourceReaderState" from the > savepoint. However, the state is read as a Java byte array and I can't > decode it or make any sense of it. I want to be able to read the savepoint > state to find out exactly how much progress (partition/offset) a job has > made in case it fails or is stopped. > > Does anyone have any ideas how I can deserialize the bytes from the Kafka > source state or more generically how to read the Kafka source operator > state from a savepoint? > > Here is the link to a github repository that contains the Flink job that I > was running, a savepoint file, and the code I was using to try to read the > savepoint. (https://github.com/charles-tan/flink-state-processor-example) > > Thanks, > Charles >