Hi, Elakiya.

I think you could check :

   1. The TaskManager Log to figure out whether the job is restoring from
   an existing checkpoint and the restoring checkpoint path.
   2. Or you could check the checkpoint ID when you restart your job (If
   not restoring from a checkpoint, it starts from 1).
   3. Or you could check the checkpoint part in the FLINK UI, it will show
   the detailed info about restoring checkpoint (ID and Path).


On Thu, Sep 28, 2023 at 9:21 PM elakiya udhayanan <laks....@gmail.com>
wrote:

> Hi Feng,
>
> Thanks for your response.
>
> 1. We have configured checkpointing to upload to a s3 location, also we
> see metadata files getting created in the s3 location. But we are unsure if
> the job is getting triggered from that checkpoint in case of failure. Is
> there a possible way to test this. Also does this apply for any upgrades or
> enhancements to the job or how we can commit the offset in such cases.
> 2. For the savepointing, we are currently exploring it.
> 3. I would like to know if there are any properties that Flink provides to
> do the Kafka offset commit.
>
> Thanks,
> Elakiya
>
> On Thu, Sep 28, 2023 at 3:10 PM Feng Jin <jinfeng1...@gmail.com> wrote:
>
>> Hi Elakiya
>>
>> 1. You can confirm if the checkpoint for the task has been triggered
>> normally?
>>
>> 2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
>> specify the path to the savepoint when starting the Flink job for recovery.
>> This is necessary to continue consuming from the historical offset
>> correctly.
>>
>>
>> Best,
>> Feng
>>
>>
>> On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan <laks....@gmail.com>
>> wrote:
>>
>>> Hi team,
>>>
>>> I have a Kafka topic named employee which uses confluent avro schema and
>>> will emit the payload as below:
>>> {
>>> "id": "emp_123456",
>>> "employee": {
>>> "id": "123456",
>>> "name": "sampleName"
>>> }
>>> }
>>> I am using the upsert-kafka connector to consume the events from the
>>> above Kafka topic as below using the Flink SQL DDL statement.The problem is
>>> the connector is not committing the offset. Everytime, I submit the job, it
>>> reads Kafka events from the beginning. Please let me know if we can commit
>>> the offset for the read Kafka events.
>>>
>>> DDL Statement:
>>> String statement = "CREATE TABLE Employee (\r\n" +
>>> "  id STRING,\r\n" +
>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>> "  ),\r\n" +
>>> "  PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" +
>>> ") WITH (\r\n" +
>>> "  'connector' = 'upsert-kafka',\r\n" +
>>> "  'topic' = 'employee',\r\n" +
>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>> "  'key.format' = 'raw',\r\n" +
>>> "  'value.format' = 'avro-confluent',\r\n" +
>>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>>> +
>>> ")";
>>> Any help is appreciated TIA
>>>
>>> Thanks,
>>> Elakiya
>>>
>>

-- 
Best,
Hangxiang.

Reply via email to