SELECT class_no, collect(info)

FROM (

SELECT class_no, ROW(student_no, name, age) AS info

FROM source_table

)

GROUP BY class_no;


从SQL层面想到比较接近的方法,但multiset无法转array


从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面
感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下

在 2021-12-03 08:36:57,"casel.chen" <casel_c...@126.com> 写道:
>可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。
>类似我这样的需求应该其他人也会遇到吧?
>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。
>           输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身
>
>
>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-12-02 09:58:28,"cyril cui" <cw68s...@gmail.com> 写道:
>>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可
>>
>>casel.chen <casel_c...@126.com> 于2021年12月2日周四 上午9:46写道:
>>
>>> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
>>> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
>>> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>>>
>>> kafka源表:
>>> 班级     学号      姓名      年龄
>>> 1         20001    张三       15
>>> 2         20011    李四       16
>>> 1         20002    王五       16
>>> 2         20012    吴六       15
>>>
>>> create table source_table (
>>>    class_no: INT,
>>>    student_no: INT,
>>>    name: STRING,
>>>    age: INT
>>> ) with (
>>>    'connector' = 'kafka',
>>>    ...
>>> );
>>>
>>>
>>>
>>> 通过flink sql处理输出 ==>
>>>
>>>
>>> mongodb目标表:
>>> 班级     学生信息
>>> 1         [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>>> 20002, "name":"王五", "age": 16}]
>>> 2         [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>>> 20012, "name":"吴六", "age": 15}]
>>>
>>> create table sink_table (
>>>   class_no INT,
>>>   students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
>>> ) with (
>>>   'connector' = 'mongodb',
>>>   ...
>>> );
>>>
>>>

回复