我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW<f1 STRING, f2 INT, ...> @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") 这样写没有问题@DataTypeHint("ROW") 这样写会报错
在 2021-12-01 11:12:27,"Caizhi Weng" <tsreape...@gmail.com> 写道: >Hi! > >UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。 > >[1] >https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc > >casel.chen <casel_c...@126.com> 于2021年12月1日周三 上午7:56写道: > >> 业务中使用flink sql group by操作后想收集所有的数据,如下示例: >> >> >> kafka源表: >> 班级 学号 姓名 年龄 >> 1 20001 张三 15 >> 2 20011 李四 16 >> 1 20002 王五 16 >> 2 20012 吴六 15 >> >> >> create table source_table ( >> class_no: INT, >> student_no: INT, >> name: STRING, >> age: INT >> ) with ( >> 'connector' = 'kafka', >> ... >> ); >> >> >> mongodb目标表: >> 班级 学生信息 >> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": >> 20002, "name":"王五", "age": 16}] >> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": >> 20012, "name":"吴六", "age": 15}] >> >> >> create table sink_table ( >> class_no INT, >> students: ARRAY<ROW<student_no STRING, name STRING, age INT>> >> ) with ( >> 'connector' = 'mongodb', >> ... >> ); >> >> >> 查了下flink自带的系统函数,接近满足条件的只有collect函数。 >> insert into sink_table select class_no, collect(ROW(student_no, name, age) >> from source_table group by class_no; >> >> >> 但它返回的是Multiset类型,即Map<?, >> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。 >> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。 >> >> >> 1. >> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子? >> 2. 如果要收集不去重的Array[ROW],又该怎么写? >> 3. 访问一个数据类型为Map<kt, vt>的数据中key和value,分别要用什么flink sql语法? >> >> >> 谢谢解答! >> >> >> >> >> >> >> >> >> >> >> >>