Hi elakiya, I think you may need to spread the columns in key and value format, then you can use the specific column as a primary key in the ddl.
Best, Shammon FY On Fri, Jun 23, 2023 at 6:36 PM elakiya udhayanan <laks....@gmail.com> wrote: > Hi team, > > I have a Kafka topic named employee which uses confluent avro schema and > will emit the payload as below: > > { > "employee": { > "id": "123456", > "name": "sampleName" > } > } > I am using the upsert-kafka connector to consume the events from the above > Kafka topic as below using the Flink SQL DDL statement, also here I want to > use the id field as the Primary key. But I am unable to use the id field > since it is inside the object. > > DDL Statement: > String statement = "CREATE TABLE Employee (\r\n" + > " employee ROW(id STRING, name STRING\r\n" + > " ),\r\n" + > " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + > ") WITH (\r\n" + > " 'connector' = 'upsert-kafka',\r\n" + > " 'topic' = 'employee',\r\n" + > " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + > " 'key.format' = 'raw',\r\n" + > " 'value.format' = 'avro-confluent',\r\n" + > " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" > + > ")"; > Any help is appreciated TIA > > Thanks, > Elakiya >