Hi Jane,

Thanks for your response. Yes it did throw a parsing error (Apache calcite-
flink internally uses it I guess).

Since, I am creating this flink table by consuming a Kafka topic, I don't
have the ability to change the avro schema , maybe I can check the
possibility of introducing a new field which would be outside of the
employee object, if flink doesn't support having compound identifier as
primary key.

Thanks,
Elakiya


On Tue, Jul 11, 2023 at 3:12 PM Jane Chan <qingyue....@gmail.com> wrote:

> Hi Elakiya,
>
> Did you encounter a ParserException when executing the DDL? AFAIK, Flink
> SQL does not support declaring a nested column (compound identifier) as
> primary key at syntax level.
>
> A possible workaround is to change the schema to not contain record type,
> then you can change the DDL to the following
>
> CREATE TABLE Employee (
>   id STRING PRIMARY KEY NOT ENFORCED,
>   name STRING
> ) WITH (
>   ...
> )
>
> Best regards,
> Jane
>
> On Mon, Jul 10, 2023 at 7:32 PM elakiya udhayanan <laks....@gmail.com>
> wrote:
>
>> Hi Hang,
>>  Once again thanks for your response, but I think you have misunderstood
>> my question. At present we are only using the DDL format of Table API and
>> the only issue we face is , we are unable to set the primary key field for
>> the Flink table since the value we want to use as primary key is present
>> inside the object as mentioned in my question earlier.
>>
>> Re-posting my question again here:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *I have a Kafka topic named employee which uses confluent avro schema and
>> will emit the payload as below:{"employee": {"id": "123456","name":
>> "sampleName"}}I am using the upsert-kafka connector to consume the events
>> from the above Kafka topic as below using the Flink SQL DDL statement, also
>> here I want to use the id field as the Primary key. But I am unable to use
>> the id field since it is inside the object.DDL Statement:String statement =
>> "CREATE TABLE Employee (\r\n" +"  employee  ROW(id STRING, name STRING\r\n"
>> +"  ),\r\n" +"  PRIMARY KEY (employee.id <http://employee.id/>) NOT
>> ENFORCED\r\n" +") WITH (\r\n" +"  'connector' = 'upsert-kafka',\r\n" +"
>>  'topic' = 'employee',\r\n" +"  'properties.bootstrap.servers' =
>> 'kafka-cp-kafka:9092',\r\n" +"  'key.format' = 'raw',\r\n" +"
>>  'value.format' = 'avro-confluent',\r\n" +"  'value.avro-confluent.url' =
>> 'http://kafka-cp-schema-registry:8081
>> <http://kafka-cp-schema-registry:8081/>',\r\n" +")";*
>>
>> On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>>
>>> Hi, Elakiya.
>>>
>>> If everything is right for the KafkaTable, I think there must be a
>>> `user_id` field in the Kafka message key.
>>> We could see the code in the method `createKeyValueProjections` of
>>> `UpsertKafkaDynamicTableFactory` as follows.
>>>
>>> ```
>>>     private Tuple2<int[], int[]>
>>> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
>>>         ResolvedSchema schema = catalogTable.getResolvedSchema();
>>>         // primary key should validated earlier
>>>         List<String> keyFields =
>>> schema.getPrimaryKey().get().getColumns();
>>>         DataType physicalDataType = schema.toPhysicalRowDataType();
>>>
>>>         Configuration tableOptions =
>>> Configuration.fromMap(catalogTable.getOptions());
>>>         // upsert-kafka will set key.fields to primary key fields by
>>> default
>>>         tableOptions.set(KEY_FIELDS, keyFields);
>>>
>>>         int[] keyProjection = createKeyFormatProjection(tableOptions,
>>> physicalDataType);
>>>         int[] valueProjection =
>>> createValueFormatProjection(tableOptions, physicalDataType);
>>>
>>>         return Tuple2.of(keyProjection, valueProjection);
>>>     }
>>> ```
>>> The primary keys will be put in the KEY_FIELDS option to create the key
>>> format projection, which will be used to get fields from Kafka message key.
>>>
>>> Best,
>>> Hang
>>>
>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道:
>>>
>>>> Hi Hang,
>>>>
>>>>  The select query works fine absolutely, we have also implemented join
>>>> queries which also works without any issues.
>>>>
>>>> Thanks,
>>>> Elakiya
>>>>
>>>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, Elakiya.
>>>>>
>>>>> Maybe this DDL could be executed. Please execute the select sql
>>>>> `select * from KafkaTable`. Then I think there will be some error or the
>>>>> `user_id` will not be read correctly.
>>>>>
>>>>> Best,
>>>>> Hang
>>>>>
>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道:
>>>>>
>>>>>> Hi Hang Ruan,
>>>>>>
>>>>>> Thanks for your response. But in the documentation, they have an
>>>>>> example of defining the Primary Key for the DDL statement (code below). 
>>>>>> In
>>>>>> that case we should be able to define the primary key for the DDL rite. 
>>>>>> We
>>>>>> have defined the primary key in our earlier use cases when it wasn't a
>>>>>> nested field. Please correct me , If I have misunderstood anything.
>>>>>>
>>>>>> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
>>>>>> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
>>>>>> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
>>>>>> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
>>>>>> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
>>>>>> 'value.fields-include' = 'EXCEPT_KEY')
>>>>>>
>>>>>> Thanks,
>>>>>> Elakiya
>>>>>>
>>>>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, elakiya.
>>>>>>>
>>>>>>> The upsert-kafka connector will read the primary keys from the Kafka
>>>>>>> message keys. We cannot define the fields in the Kafka message values as
>>>>>>> the primary key.
>>>>>>>
>>>>>>> Best,
>>>>>>> Hang
>>>>>>>
>>>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道:
>>>>>>>
>>>>>>>> Hi team,
>>>>>>>>
>>>>>>>> I have a Kafka topic named employee which uses confluent avro
>>>>>>>> schema and will emit the payload as below:
>>>>>>>>
>>>>>>>> {
>>>>>>>> "employee": {
>>>>>>>> "id": "123456",
>>>>>>>> "name": "sampleName"
>>>>>>>> }
>>>>>>>> }
>>>>>>>> I am using the upsert-kafka connector to consume the events from
>>>>>>>> the above Kafka topic as below using the Flink SQL DDL statement, also 
>>>>>>>> here
>>>>>>>> I want to use the id field as the Primary key. But I am unable to use 
>>>>>>>> the
>>>>>>>> id field since it is inside the object.
>>>>>>>>
>>>>>>>> DDL Statement:
>>>>>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>>>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>>>>>> "  ),\r\n" +
>>>>>>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>>>>>> ") WITH (\r\n" +
>>>>>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>>>>>> "  'topic' = 'employee',\r\n" +
>>>>>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>>>>>> "  'key.format' = 'raw',\r\n" +
>>>>>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>>>>>> "  'value.avro-confluent.url' = '
>>>>>>>> http://kafka-cp-schema-registry:8081',\r\n" +
>>>>>>>> ")";
>>>>>>>> Any help is appreciated TIA
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Elakiya
>>>>>>>>
>>>>>>>

Reply via email to