Hi Jane, Thanks for your response. Yes it did throw a parsing error (Apache calcite- flink internally uses it I guess).
Since, I am creating this flink table by consuming a Kafka topic, I don't have the ability to change the avro schema , maybe I can check the possibility of introducing a new field which would be outside of the employee object, if flink doesn't support having compound identifier as primary key. Thanks, Elakiya On Tue, Jul 11, 2023 at 3:12 PM Jane Chan <qingyue....@gmail.com> wrote: > Hi Elakiya, > > Did you encounter a ParserException when executing the DDL? AFAIK, Flink > SQL does not support declaring a nested column (compound identifier) as > primary key at syntax level. > > A possible workaround is to change the schema to not contain record type, > then you can change the DDL to the following > > CREATE TABLE Employee ( > id STRING PRIMARY KEY NOT ENFORCED, > name STRING > ) WITH ( > ... > ) > > Best regards, > Jane > > On Mon, Jul 10, 2023 at 7:32 PM elakiya udhayanan <laks....@gmail.com> > wrote: > >> Hi Hang, >> Once again thanks for your response, but I think you have misunderstood >> my question. At present we are only using the DDL format of Table API and >> the only issue we face is , we are unable to set the primary key field for >> the Flink table since the value we want to use as primary key is present >> inside the object as mentioned in my question earlier. >> >> Re-posting my question again here: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *I have a Kafka topic named employee which uses confluent avro schema and >> will emit the payload as below:{"employee": {"id": "123456","name": >> "sampleName"}}I am using the upsert-kafka connector to consume the events >> from the above Kafka topic as below using the Flink SQL DDL statement, also >> here I want to use the id field as the Primary key. But I am unable to use >> the id field since it is inside the object.DDL Statement:String statement = >> "CREATE TABLE Employee (\r\n" +" employee ROW(id STRING, name STRING\r\n" >> +" ),\r\n" +" PRIMARY KEY (employee.id <http://employee.id/>) NOT >> ENFORCED\r\n" +") WITH (\r\n" +" 'connector' = 'upsert-kafka',\r\n" +" >> 'topic' = 'employee',\r\n" +" 'properties.bootstrap.servers' = >> 'kafka-cp-kafka:9092',\r\n" +" 'key.format' = 'raw',\r\n" +" >> 'value.format' = 'avro-confluent',\r\n" +" 'value.avro-confluent.url' = >> 'http://kafka-cp-schema-registry:8081 >> <http://kafka-cp-schema-registry:8081/>',\r\n" +")";* >> >> On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan <ruanhang1...@gmail.com> wrote: >> >>> Hi, Elakiya. >>> >>> If everything is right for the KafkaTable, I think there must be a >>> `user_id` field in the Kafka message key. >>> We could see the code in the method `createKeyValueProjections` of >>> `UpsertKafkaDynamicTableFactory` as follows. >>> >>> ``` >>> private Tuple2<int[], int[]> >>> createKeyValueProjections(ResolvedCatalogTable catalogTable) { >>> ResolvedSchema schema = catalogTable.getResolvedSchema(); >>> // primary key should validated earlier >>> List<String> keyFields = >>> schema.getPrimaryKey().get().getColumns(); >>> DataType physicalDataType = schema.toPhysicalRowDataType(); >>> >>> Configuration tableOptions = >>> Configuration.fromMap(catalogTable.getOptions()); >>> // upsert-kafka will set key.fields to primary key fields by >>> default >>> tableOptions.set(KEY_FIELDS, keyFields); >>> >>> int[] keyProjection = createKeyFormatProjection(tableOptions, >>> physicalDataType); >>> int[] valueProjection = >>> createValueFormatProjection(tableOptions, physicalDataType); >>> >>> return Tuple2.of(keyProjection, valueProjection); >>> } >>> ``` >>> The primary keys will be put in the KEY_FIELDS option to create the key >>> format projection, which will be used to get fields from Kafka message key. >>> >>> Best, >>> Hang >>> >>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道: >>> >>>> Hi Hang, >>>> >>>> The select query works fine absolutely, we have also implemented join >>>> queries which also works without any issues. >>>> >>>> Thanks, >>>> Elakiya >>>> >>>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> >>>> wrote: >>>> >>>>> Hi, Elakiya. >>>>> >>>>> Maybe this DDL could be executed. Please execute the select sql >>>>> `select * from KafkaTable`. Then I think there will be some error or the >>>>> `user_id` will not be read correctly. >>>>> >>>>> Best, >>>>> Hang >>>>> >>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道: >>>>> >>>>>> Hi Hang Ruan, >>>>>> >>>>>> Thanks for your response. But in the documentation, they have an >>>>>> example of defining the Primary Key for the DDL statement (code below). >>>>>> In >>>>>> that case we should be able to define the primary key for the DDL rite. >>>>>> We >>>>>> have defined the primary key in our earlier use cases when it wasn't a >>>>>> nested field. Please correct me , If I have misunderstood anything. >>>>>> >>>>>> CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', >>>>>> `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY >>>>>> (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ... >>>>>> 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', >>>>>> 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', >>>>>> 'value.fields-include' = 'EXCEPT_KEY') >>>>>> >>>>>> Thanks, >>>>>> Elakiya >>>>>> >>>>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, elakiya. >>>>>>> >>>>>>> The upsert-kafka connector will read the primary keys from the Kafka >>>>>>> message keys. We cannot define the fields in the Kafka message values as >>>>>>> the primary key. >>>>>>> >>>>>>> Best, >>>>>>> Hang >>>>>>> >>>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道: >>>>>>> >>>>>>>> Hi team, >>>>>>>> >>>>>>>> I have a Kafka topic named employee which uses confluent avro >>>>>>>> schema and will emit the payload as below: >>>>>>>> >>>>>>>> { >>>>>>>> "employee": { >>>>>>>> "id": "123456", >>>>>>>> "name": "sampleName" >>>>>>>> } >>>>>>>> } >>>>>>>> I am using the upsert-kafka connector to consume the events from >>>>>>>> the above Kafka topic as below using the Flink SQL DDL statement, also >>>>>>>> here >>>>>>>> I want to use the id field as the Primary key. But I am unable to use >>>>>>>> the >>>>>>>> id field since it is inside the object. >>>>>>>> >>>>>>>> DDL Statement: >>>>>>>> String statement = "CREATE TABLE Employee (\r\n" + >>>>>>>> " employee ROW(id STRING, name STRING\r\n" + >>>>>>>> " ),\r\n" + >>>>>>>> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >>>>>>>> ") WITH (\r\n" + >>>>>>>> " 'connector' = 'upsert-kafka',\r\n" + >>>>>>>> " 'topic' = 'employee',\r\n" + >>>>>>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >>>>>>>> " 'key.format' = 'raw',\r\n" + >>>>>>>> " 'value.format' = 'avro-confluent',\r\n" + >>>>>>>> " 'value.avro-confluent.url' = ' >>>>>>>> http://kafka-cp-schema-registry:8081',\r\n" + >>>>>>>> ")"; >>>>>>>> Any help is appreciated TIA >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Elakiya >>>>>>>> >>>>>>>