我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW<f1 STRING, f2 INT, ...>
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错














在 2021-12-01 11:12:27,"Caizhi Weng" <tsreape...@gmail.com> 写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen <casel_c...@126.com> 于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级     学号      姓名      年龄
>> 1         20001    张三       15
>> 2         20011    李四       16
>> 1         20002    王五       16
>> 2         20012    吴六       15
>>
>>
>> create table source_table (
>>    class_no: INT,
>>    student_no: INT,
>>    name: STRING,
>>    age: INT
>> ) with (
>>    'connector' = 'kafka',
>>    ...
>> );
>>
>>
>> mongodb目标表:
>> 班级     学生信息
>> 1         [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2         [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY<ROW<student_no STRING, name STRING, age INT>>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map<?,
>> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map<kt, vt>的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

回复