1.????ddl????array??????????????select 
????????????????????2.??????????????????????????????null??flink????????????array??????????????
CREATE TABLE sourceTable (
        event_time_line array<ROW (
                `rule_name` VARCHAR,
                `count` VARCHAR
        )&gt;
) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.startup-mode' = 'earliest-offset',
        'connector.topic' = 'topic_test_1',
        'connector.properties.zookeeper.connect' = 'localhost:2181',
        'connector.properties.bootstrap.servers' = 'localhost:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
);
--????????????
select event_time_line from sourceTable ;
--??????????????????????????value??????????????????????size??????????
select type_change(event_time_line) from sourceTable ;


public class TypeChange extends ScalarFunction {
    /**
     * ??null????????????????
     * @param rows
     * @return
     */
    public String eval(Row [] rows){
        return JSONObject.toJSONString(rows);
    }

}

回复