Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

We can not set specific 'value.deserializer' in table option.
'key.deserializer' and 'value.deserializer' is always set to
'org.apache.kafka.common.serialization.ByteArrayDeserializer'.

If you want to implement a format, you could take a look at the code
JsonFormatFactory.java in flink-formats/flink-json. And the format will be
loaded via SPI.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:51写道:

> Hi Hang,
>
>
>
> My question is can we use specific ‘value.deserializer’ in table option
> via kafka connector is there any way or not ? I have already kept
> 'value.format' in below code snippet so is that enough and handle
> deserializer by itself internally?
>
> How to create custom format can you please share any link for sample
> example for the same  ?
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
>
>
>
>
> *From:* Hang Ruan 
> *Sent:* Monday, January 10, 2022 3:06 PM
> *To:* d...@flink.apache.org; Ronak Beejawat (rbeejawa) 
> *Cc:* commun...@flink.apache.org; user@flink.apache.org
> *Subject:* Re: Regarding Connector Options - value.deserializer
>
>
>
> Hi, Ronak,
>
>
>
> I think you should implement a custom format by yourself instead of
> overriding. The 'value.format' is a required table option.
>
>
>
> Best,
>
> Hang
>
>
>
> Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:09
> 写道:
>
> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>
>


Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

I think you should implement a custom format by yourself instead of
overriding. The 'value.format' is a required table option.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一
17:09写道:

> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>