Hi Elakiya

1. You can confirm if the checkpoint for the task has been triggered
normally?

2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
specify the path to the savepoint when starting the Flink job for recovery.
This is necessary to continue consuming from the historical offset
correctly.


Best,
Feng


On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan <laks....@gmail.com>
wrote:

> Hi team,
>
> I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:
> {
> "id": "emp_123456",
> "employee": {
> "id": "123456",
> "name": "sampleName"
> }
> }
> I am using the upsert-kafka connector to consume the events from the above
> Kafka topic as below using the Flink SQL DDL statement.The problem is the
> connector is not committing the offset. Everytime, I submit the job, it
> reads Kafka events from the beginning. Please let me know if we can commit
> the offset for the read Kafka events.
>
> DDL Statement:
> String statement = "CREATE TABLE Employee (\r\n" +
> "  id STRING,\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
> Any help is appreciated TIA
>
> Thanks,
> Elakiya
>

Reply via email to