Re:Re: flink sql group by后收集数据问题

2021-12-01 文章 casel.chen



我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW
@DataTypeHint("ROW") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错














在 2021-12-01 11:12:27,"Caizhi Weng"  写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen  于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级 学号  姓名  年龄
>> 1 20001张三   15
>> 2 20011李四   16
>> 1 20002王五   16
>> 2 20012吴六   15
>>
>>
>> create table source_table (
>>class_no: INT,
>>student_no: INT,
>>name: STRING,
>>age: INT
>> ) with (
>>'connector' = 'kafka',
>>...
>> );
>>
>>
>> mongodb目标表:
>> 班级 学生信息
>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re:Re: flink sql group by后收集数据问题

2021-11-30 文章 casel.chen
我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW
@DataTypeHint("ROW") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错

在 2021-12-01 11:12:27,"Caizhi Weng"  写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen  于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级 学号  姓名  年龄
>> 1 20001张三   15
>> 2 20011李四   16
>> 1 20002王五   16
>> 2 20012吴六   15
>>
>>
>> create table source_table (
>>class_no: INT,
>>student_no: INT,
>>name: STRING,
>>age: INT
>> ) with (
>>'connector' = 'kafka',
>>...
>> );
>>
>>
>> mongodb目标表:
>> 班级 学生信息
>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>