使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group 
by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? 
如果不能的话要怎么写UDAF,有例子参考吗?谢谢!

kafka源表:
班级     学号      姓名      年龄
1         20001    张三       15
2         20011    李四       16
1         20002    王五       16
2         20012    吴六       15

create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);



通过flink sql处理输出 ==>


mongodb目标表:
班级     学生信息
1         [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2         [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]

create table sink_table (
  class_no INT,
  students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
) with (
  'connector' = 'mongodb',
  ...
);

回复