业务中使用flink sql group by操作后想收集每个分组下所有的数据,如下示例:


kafka源表:
班级     学号      姓名      年龄
1         20001    张三       15
2         20011    李四       16
1         20002    王五       16
2         20012    吴六       15


create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);


mongodb目标表:
班级     学生信息
1         [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2         [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]


create table sink_table (
  class_no INT,
  students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
) with (
  'connector' = 'mongodb',
  ...
);


查了下flink自带的系统函数,接近满足条件的只有collect函数。
insert into sink_table select class_no, collect(ROW(student_no, name, age) from 
source_table group by class_no;


但它返回的是Multiset类型,即Map<?, 
Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。


三个问题: 
1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?想尽量做得通用,收集的数据类型是ROW<?>,而不只是case by 
case的针对特定类型的udf,如这里的ROW<student_no STRING, name STRING, age INT>
2. 如果要收集不去重的Array[ROW],又该怎么写?
3. 访问一个数据类型为Map<kt, vt>的数据中key和value,要用什么flink sql语法?


谢谢解答!

回复