Hi team,

I have a Kafka topic named employee which uses confluent avro schema and
will emit the payload as below:
{
"id": "emp_123456",
"employee": {
"id": "123456",
"name": "sampleName"
}
}
I am using the upsert-kafka connector to consume the events from the above
Kafka topic as below using the Flink SQL DDL statement.The problem is the
connector is not committing the offset. Everytime, I submit the job, it
reads Kafka events from the beginning. Please let me know if we can commit
the offset for the read Kafka events.

DDL Statement:
String statement = "CREATE TABLE Employee (\r\n" +
"  id STRING,\r\n" +
"  employee  ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
+
")";
Any help is appreciated TIA

Thanks,
Elakiya

Reply via email to