Hi Elakiya 1. You can confirm if the checkpoint for the task has been triggered normally?
2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and specify the path to the savepoint when starting the Flink job for recovery. This is necessary to continue consuming from the historical offset correctly. Best, Feng On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan <laks....@gmail.com> wrote: > Hi team, > > I have a Kafka topic named employee which uses confluent avro schema and > will emit the payload as below: > { > "id": "emp_123456", > "employee": { > "id": "123456", > "name": "sampleName" > } > } > I am using the upsert-kafka connector to consume the events from the above > Kafka topic as below using the Flink SQL DDL statement.The problem is the > connector is not committing the offset. Everytime, I submit the job, it > reads Kafka events from the beginning. Please let me know if we can commit > the offset for the read Kafka events. > > DDL Statement: > String statement = "CREATE TABLE Employee (\r\n" + > " id STRING,\r\n" + > " employee ROW(id STRING, name STRING\r\n" + > " ),\r\n" + > " PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" + > ") WITH (\r\n" + > " 'connector' = 'upsert-kafka',\r\n" + > " 'topic' = 'employee',\r\n" + > " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + > " 'key.format' = 'raw',\r\n" + > " 'value.format' = 'avro-confluent',\r\n" + > " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" > + > ")"; > Any help is appreciated TIA > > Thanks, > Elakiya >