1.blink_planner ????ddl????array??????????????select ????????????????????
 2.blink_planner 
????????????????????????????????????????????????flink????????????


3.????????flink-planner????????????



CREATE TABLE sourceTable (
 
&nbsp;event_time_line array<ROW (
 
&nbsp; `rule_name` VARCHAR,
 
&nbsp; `count` VARCHAR
 
&nbsp;)&gt;
 
) WITH (
 
&nbsp;'connector.type' = 'kafka',
 
&nbsp;'connector.version' = 'universal',
 
&nbsp;'connector.startup-mode' = 'earliest-offset',
 
&nbsp;'connector.topic' = 'topic_test_1',
 
&nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
 
&nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
 
&nbsp;'update-mode' = 'append',
 
&nbsp;'format.type' = 'json',
 
&nbsp;'format.derive-schema' = 'true'
 
);
 
--????????????
 
select event_time_line from sourceTable ;
 
--??????????????????????????value??????????????????????size??????????
 
select type_change(event_time_line) from sourceTable ;
 
&nbsp;
 
public class TypeChange extends ScalarFunction {
 
&nbsp;&nbsp;&nbsp; /**
 
&nbsp;&nbsp;&nbsp;&nbsp; * 
??null??????????????????????????????????????????????????????????planner????????????????
 
&nbsp;&nbsp;&nbsp;&nbsp; * @param rows
 
&nbsp;&nbsp;&nbsp;&nbsp; * @return
 
&nbsp;&nbsp;&nbsp;&nbsp; */
 
&nbsp;&nbsp;&nbsp; public String eval(Row [] rows){
 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;return JSONObject.toJSONString(rows);
 
&nbsp;&nbsp;&nbsp; }
 
&nbsp;
 
}

Reply via email to