Hi Team,

Is there any way we use value.deserializer in Connector Options from kafka via 
sql api?

PFB below code snippt :

tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
             + "   org_id STRING\r\n"
             + "   ,cluster_id STRING\r\n"
             + "   ,globalcallid_callmanagerid STRING\r\n"
             + "   ,globalcallid_callid INT\r\n"
             + "   ,callidentifier INT\r\n"
             + "    ,varvqmetrics STRING\r\n"
             + "    ,duration INT\r\n"
             + "   )\r\n"
             + "   WITH (\r\n"
             + "   'connector' = 'kafka'\r\n"
             + "   ,'topic' = 'cmr'\r\n"
             + "   ,'properties.bootstrap.servers' = 
'b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092'\r\n"
             + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
             + "   ,'properties.value.deserializer' = 'json'\r\n"
             + "   ,'value.format' = 'json'\r\n"
             + "   )");


Thanks
Ronak Beejawat

Reply via email to