业务中使用flink sql group by操作后想收集每个分组下所有的数据,如下示例:
kafka源表: 班级 学号 姓名 年龄 1 20001 张三 15 2 20011 李四 16 1 20002 王五 16 2 20012 吴六 15 create table source_table ( class_no: INT, student_no: INT, name: STRING, age: INT ) with ( 'connector' = 'kafka', ... ); mongodb目标表: 班级 学生信息 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, "name":"王五", "age": 16}] 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, "name":"吴六", "age": 15}] create table sink_table ( class_no INT, students: ARRAY<ROW<student_no STRING, name STRING, age INT>> ) with ( 'connector' = 'mongodb', ... ); 查了下flink自带的系统函数,接近满足条件的只有collect函数。 insert into sink_table select class_no, collect(ROW(student_no, name, age) from source_table group by class_no; 但它返回的是Multiset类型,即Map<?, Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。 三个问题: 1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?想尽量做得通用,收集的数据类型是ROW<?>,而不只是case by case的针对特定类型的udf,如这里的ROW<student_no STRING, name STRING, age INT> 2. 如果要收集不去重的Array[ROW],又该怎么写? 3. 访问一个数据类型为Map<kt, vt>的数据中key和value,要用什么flink sql语法? 谢谢解答!