Re:Re:Re: flink sql collect函数使用问题

2021-12-03 文章 RS
SELECT class_no, collect(info)

FROM (

SELECT class_no, ROW(student_no, name, age) AS info

FROM source_table

)

GROUP BY class_no;


从SQL层面想到比较接近的方法,但multiset无法转array


从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面
感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下

在 2021-12-03 08:36:57,"casel.chen"  写道:
>可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。
>类似我这样的需求应该其他人也会遇到吧?
>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。
>   输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身
>
>
>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-12-02 09:58:28,"cyril cui"  写道:
>>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可
>>
>>casel.chen  于2021年12月2日周四 上午9:46写道:
>>
>>> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
>>> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
>>> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>>>
>>> kafka源表:
>>> 班级 学号  姓名  年龄
>>> 1 20001张三   15
>>> 2 20011李四   16
>>> 1 20002王五   16
>>> 2 20012吴六   15
>>>
>>> create table source_table (
>>>class_no: INT,
>>>student_no: INT,
>>>name: STRING,
>>>age: INT
>>> ) with (
>>>'connector' = 'kafka',
>>>...
>>> );
>>>
>>>
>>>
>>> 通过flink sql处理输出 ==>
>>>
>>>
>>> mongodb目标表:
>>> 班级 学生信息
>>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>>> 20002, "name":"王五", "age": 16}]
>>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>>> 20012, "name":"吴六", "age": 15}]
>>>
>>> create table sink_table (
>>>   class_no INT,
>>>   students: ARRAY>
>>> ) with (
>>>   'connector' = 'mongodb',
>>>   ...
>>> );
>>>
>>>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-03 文章 Till Rohrmann
Thanks for starting this discussion Yingjie,

How will our tests be affected by these changes? Will Flink require more
resources and, thus, will it risk destabilizing our testing infrastructure?

I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Cheers,
Till

On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao  wrote:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


回复: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-03 文章 su wenwen
看报错和log4j 文件格式有关,log4j.properties的文件改为log4j2.xml 试一下


发件人: summer 
发送时间: 2021年12月2日 11:32
收件人: user-zh@flink.apache.org 
主题: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed

在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default
configuration (logging only errors to the console), or user
programmatically provided configurations. Set system property
'log4j2.debug' to show Log4j 2 internal initialization logging. See
https://logging.apache.org/log4j/2.x/manual/configuration.html for
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed


请问这是什么原因造成的?


回复: 退订

2021-12-03 文章 su wenwen
退订发送到 user-zh-unsubscr...@flink.apache.org


发件人: ™薇维苿尉℃ 
发送时间: 2021年12月3日 17:34
收件人: user-zh 
主题: 退订

退订


????

2021-12-03 文章 ?6?4??????????


????????

2021-12-03 文章 ?6?4??????????


[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 文章 Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie