1.????ddl????array??????????????select ????????????????????2.??????????????????????????????null??flink????????????array?????????????? CREATE TABLE sourceTable ( event_time_line array<ROW ( `rule_name` VARCHAR, `count` VARCHAR )> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.startup-mode' = 'earliest-offset', 'connector.topic' = 'topic_test_1', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); --???????????? select event_time_line from sourceTable ; --??????????????????????????value??????????????????????size?????????? select type_change(event_time_line) from sourceTable ;
public class TypeChange extends ScalarFunction { /** * ??null???????????????? * @param rows * @return */ public String eval(Row [] rows){ return JSONObject.toJSONString(rows); } }