Hi Hang,
 Once again thanks for your response, but I think you have misunderstood my
question. At present we are only using the DDL format of Table API and the
only issue we face is , we are unable to set the primary key field for the
Flink table since the value we want to use as primary key is present inside
the object as mentioned in my question earlier.

Re-posting my question again here:























*I have a Kafka topic named employee which uses confluent avro schema and
will emit the payload as below:{"employee": {"id": "123456","name":
"sampleName"}}I am using the upsert-kafka connector to consume the events
from the above Kafka topic as below using the Flink SQL DDL statement, also
here I want to use the id field as the Primary key. But I am unable to use
the id field since it is inside the object.DDL Statement:String statement =
"CREATE TABLE Employee (\r\n" +"  employee  ROW(id STRING, name STRING\r\n"
+"  ),\r\n" +"  PRIMARY KEY (employee.id <http://employee.id/>) NOT
ENFORCED\r\n" +") WITH (\r\n" +"  'connector' = 'upsert-kafka',\r\n" +"
 'topic' = 'employee',\r\n" +"  'properties.bootstrap.servers' =
'kafka-cp-kafka:9092',\r\n" +"  'key.format' = 'raw',\r\n" +"
 'value.format' = 'avro-confluent',\r\n" +"  'value.avro-confluent.url' =
'http://kafka-cp-schema-registry:8081
<http://kafka-cp-schema-registry:8081/>',\r\n" +")";*

On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan <ruanhang1...@gmail.com> wrote:

> Hi, Elakiya.
>
> If everything is right for the KafkaTable, I think there must be a
> `user_id` field in the Kafka message key.
> We could see the code in the method `createKeyValueProjections` of
> `UpsertKafkaDynamicTableFactory` as follows.
>
> ```
>     private Tuple2<int[], int[]>
> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
>         ResolvedSchema schema = catalogTable.getResolvedSchema();
>         // primary key should validated earlier
>         List<String> keyFields = schema.getPrimaryKey().get().getColumns();
>         DataType physicalDataType = schema.toPhysicalRowDataType();
>
>         Configuration tableOptions =
> Configuration.fromMap(catalogTable.getOptions());
>         // upsert-kafka will set key.fields to primary key fields by
> default
>         tableOptions.set(KEY_FIELDS, keyFields);
>
>         int[] keyProjection = createKeyFormatProjection(tableOptions,
> physicalDataType);
>         int[] valueProjection = createValueFormatProjection(tableOptions,
> physicalDataType);
>
>         return Tuple2.of(keyProjection, valueProjection);
>     }
> ```
> The primary keys will be put in the KEY_FIELDS option to create the key
> format projection, which will be used to get fields from Kafka message key.
>
> Best,
> Hang
>
> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道:
>
>> Hi Hang,
>>
>>  The select query works fine absolutely, we have also implemented join
>> queries which also works without any issues.
>>
>> Thanks,
>> Elakiya
>>
>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>>
>>> Hi, Elakiya.
>>>
>>> Maybe this DDL could be executed. Please execute the select sql `select
>>> * from KafkaTable`. Then I think there will be some error or the `user_id`
>>> will not be read correctly.
>>>
>>> Best,
>>> Hang
>>>
>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道:
>>>
>>>> Hi Hang Ruan,
>>>>
>>>> Thanks for your response. But in the documentation, they have an
>>>> example of defining the Primary Key for the DDL statement (code below). In
>>>> that case we should be able to define the primary key for the DDL rite. We
>>>> have defined the primary key in our earlier use cases when it wasn't a
>>>> nested field. Please correct me , If I have misunderstood anything.
>>>>
>>>> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
>>>> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
>>>> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
>>>> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
>>>> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
>>>> 'value.fields-include' = 'EXCEPT_KEY')
>>>>
>>>> Thanks,
>>>> Elakiya
>>>>
>>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, elakiya.
>>>>>
>>>>> The upsert-kafka connector will read the primary keys from the Kafka
>>>>> message keys. We cannot define the fields in the Kafka message values as
>>>>> the primary key.
>>>>>
>>>>> Best,
>>>>> Hang
>>>>>
>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道:
>>>>>
>>>>>> Hi team,
>>>>>>
>>>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>>>> and will emit the payload as below:
>>>>>>
>>>>>> {
>>>>>> "employee": {
>>>>>> "id": "123456",
>>>>>> "name": "sampleName"
>>>>>> }
>>>>>> }
>>>>>> I am using the upsert-kafka connector to consume the events from the
>>>>>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>>>>>> want to use the id field as the Primary key. But I am unable to use the 
>>>>>> id
>>>>>> field since it is inside the object.
>>>>>>
>>>>>> DDL Statement:
>>>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>>>> "  ),\r\n" +
>>>>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>>>> ") WITH (\r\n" +
>>>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>>>> "  'topic' = 'employee',\r\n" +
>>>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>>>> "  'key.format' = 'raw',\r\n" +
>>>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>>>> "  'value.avro-confluent.url' = 
>>>>>> 'http://kafka-cp-schema-registry:8081',\r\n"
>>>>>> +
>>>>>> ")";
>>>>>> Any help is appreciated TIA
>>>>>>
>>>>>> Thanks,
>>>>>> Elakiya
>>>>>>
>>>>>

Reply via email to