Hi, Ronak,

I think you should implement a custom format by yourself instead of
overriding. The 'value.format' is a required table option.

Best,
Hang

Ronak Beejawat (rbeejawa) <rbeej...@cisco.com.invalid> 于2022年1月10日周一
17:09写道:

> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>              + "   org_id STRING\r\n"
>              + "   ,cluster_id STRING\r\n"
>              + "   ,globalcallid_callmanagerid STRING\r\n"
>              + "   ,globalcallid_callid INT\r\n"
>              + "   ,callidentifier INT\r\n"
>              + "    ,varvqmetrics STRING\r\n"
>              + "    ,duration INT\r\n"
>              + "   )\r\n"
>              + "   WITH (\r\n"
>              + "   'connector' = 'kafka'\r\n"
>              + "   ,'topic' = 'cmr'\r\n"
>              + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>              + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>              + "   ,'properties.value.deserializer' = 'json'\r\n"
>              + "   ,'value.format' = 'json'\r\n"
>              + "   )");
>
>
> Thanks
> Ronak Beejawat
>

Reply via email to