Hi, Ronak,
We can not set specific 'value.deserializer' in table option.
'key.deserializer' and 'value.deserializer' is always set to
'org.apache.kafka.common.serialization.ByteArrayDeserializer'.
If you want to implement a format, you could take a look at the code
JsonFormatFactory.java in flink-formats/flink-json. And the format will be
loaded via SPI.
Best,
Hang
Ronak Beejawat (rbeejawa) 于2022年1月10日周一 17:51写道:
> Hi Hang,
>
>
>
> My question is can we use specific ‘value.deserializer’ in table option
> via kafka connector is there any way or not ? I have already kept
> 'value.format' in below code snippet so is that enough and handle
> deserializer by itself internally?
>
> How to create custom format can you please share any link for sample
> example for the same ?
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
>
>
>
>
> *From:* Hang Ruan
> *Sent:* Monday, January 10, 2022 3:06 PM
> *To:* d...@flink.apache.org; Ronak Beejawat (rbeejawa)
> *Cc:* commun...@flink.apache.org; user@flink.apache.org
> *Subject:* Re: Regarding Connector Options - value.deserializer
>
>
>
> Hi, Ronak,
>
>
>
> I think you should implement a custom format by yourself instead of
> overriding. The 'value.format' is a required table option.
>
>
>
> Best,
>
> Hang
>
>
>
> Ronak Beejawat (rbeejawa) 于2022年1月10日周一 17:09
> 写道:
>
> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
> + " org_id STRING\r\n"
> + " ,cluster_id STRING\r\n"
> + " ,globalcallid_callmanagerid STRING\r\n"
> + " ,globalcallid_callid INT\r\n"
> + " ,callidentifier INT\r\n"
> + ",varvqmetrics STRING\r\n"
> + ",duration INT\r\n"
> + " )\r\n"
> + " WITH (\r\n"
> + " 'connector' = 'kafka'\r\n"
> + " ,'topic' = 'cmr'\r\n"
> + " ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
> + " ,'scan.startup.mode' = 'earliest-offset'\r\n"
> + " ,'properties.value.deserializer' = 'json'\r\n"
> + " ,'value.format' = 'json'\r\n"
> + " )");
>
>
> Thanks
> Ronak Beejawat
>
>