Re: flink-netty-shuffle文件占满节点磁盘

2022-02-15 文章 Yingjie Cao
磁盘占满报的错误是什么呢?是iNode不够用还是磁盘空间不够用呢?我理解这个是个目录吧:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa,是说这个目录太多了没被清理掉,导致iNode不足,还是说这个目录下的文件没被清理干净,导致磁盘空间被占满呢?如果作业停掉,会恢复吗(也就是说是说作业本身就是要用这么多磁盘,还是有泄露,即使job停掉数据依然在呢)?
另外就是作业用的什么版本的flink呢?建议开一下数据压缩,如果作业本身确实需要的磁盘空间很大,那压缩应该有利于节省磁盘空间,另外默认是hash
shuffle的实现,文件很多,会占用很多的iNode,容易导致报磁盘空间不足的错误。有一个文档可以参考下:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/batch/blocking_shuffle/
。不过这个文档的内容还没有更新成最新的,估计明天会更新,如果着急的话,也可以直接参考下源代码中的描述:
https://github.com/apache/flink/blob/master/docs/content.zh/docs/ops/batch/blocking_shuffle.md
。

Bai XiangHui(智能平台)  于2022年2月14日周一 17:49写道:

> 各位老师好:
> 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
> 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
> 说明:
>
> 1.  批处理模式
> 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx
> \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
>
>
> 3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
>
>
>
> String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
> String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> DataStream blackListStream =
> env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
>
> MapStateDescriptor type =
> new MapStateDescriptor("blackList_type",
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
> BroadcastStream blackList_b =
> blackListStream.broadcast(type);
>
> DataStream>
> oneDayLog = env.readTextFile(oneDayLogFile)
> .map(new MapFunction String, String, String>>() {
> @Override
> public Tuple5
> map(String line) throws Exception {
> String[] arrs = line.split("\t");
> return new Tuple5<>(arrs[0], arrs[1], arrs[2],
> arrs[3], arrs[4]);
> }
> });
>
> SingleOutputStreamOperator String>> dayOutput = env.readTextFile(historyFileName)
> .flatMap(new FlatParseLong())
> .union(oneDayLog)
> .connect(blackList_b)
> .process(new BroadcastProcessFunction String, String, String, String>, String, Tuple5 String, String>>() {
> private transient ReadOnlyBroadcastState Boolean> broadcastState;
>
> @Override
> public void processElement(Tuple5 String, String, String> value, ReadOnlyContext ctx,
> Collector> out) throws
> Exception {
> if(broadcastState == null){
> broadcastState = ctx.getBroadcastState(type);
> }
> if(value!=null &&
> !broadcastState.contains(value.f0)){
> out.collect(value);
> }
> }
> @Override
> public void processBroadcastElement(String value,
> Context ctx, Collector> out)
> throws Exception {
> if(StringUtils.isNotEmpty(value)){
> BroadcastState broadcastState
> = ctx.getBroadcastState(type);
> broadcastState.put(value, true);
> }
> }
> });
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-06 文章 Yingjie Cao
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao  于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> --
> From:刘建刚 
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh 
> Cc:dev ; user 
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao  于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability ga

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-04 文章 Yingjie Cao
Hi all,

After running some tests with the proposed default value (
taskmanager.network.sort-shuffle.min-parallelism: 1,
taskmanager.network.sort-shuffle.min-buffers: 512,
taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
share some test results.

1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
default parallelism and several different settings multiple times):
1) Stability:
Compared to the current default values, the proposed default values can
improve the TPC-DS query stability a lot. With the current default, there
are many queries suffering from blocking shuffle relevant failures. With
the proposed default values, only three queries fail because of the
"Insufficient number of network buffers:" error. With 512 parallelism, the
current default configuration will incur the same issue. Part of the reason
is that the network buffer consumed by InputGate is  proportional to
parallelism and can use 32M network memory by default and many tasks has
several InputGate but we only has 128M network memory per TaskManager by
default.
2) Performance:
Compared to the current default values, the proposed default values can
improve the TPC-DS query performance a lot. Except for those queries of
small shuffle data amount which consume really short time, the proposed
default values can bring 2-10 times performance gain. About the default
value of taskmanager.network.sort-shuffle.min-parallelism  proposed by Yun,
I tested both 1 and 128 and 1 is better for performance which is as
expected.

2. Flink pre-commit stability test:
I have run all Flink tests with the proposed default value for more than 20
times. The only instability is the "Insufficient number of network
buffers:" error for batch several test cases. This error occurs because
some tests have really limited network buffers and the proposed default
config values may increase the network buffer consumption for cases. After
increase the total network size for these test cases, the issue can be
solved.

Summary:
1. The proposed default value can improve both the performance and
stability of Flink batch shuffle a lot.
2. Some batch jobs may fail because of the "Insufficient number of network
buffers:" error for this default value change will increase the network
buffer consumption a little for jobs less than 512 parallelism (for jobs
more than 512 parallelism network buffer consumption will be reduced).
3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has better
performance than setting that to 128, both settings may incur the
"Insufficient number of network buffers:" error.
4. After changing the default value and fixing several test cases, all
Flink tests (except for those known unstable cases) can run stably.

Personally, I am +1 to make the change. Though the change may cause some
batch jobs fail because of the "Insufficient number of network buffers:",
the possibility is small enough (only 3 TPC-DS out of about 100 queries
fails, these queries will also fail with the current default configuration
because it is the InputGate which takes the most network buffers and cost
the error). Compared to this small regression, the performance and
stability gains are big. Any feedbacks especially those from Flink batch
users are highly appreciated.

BTW, aside from the above tests, I also tries to tune some more config
options to try to make the TPC-DS test faster. I copied these tuned config
options from our daily TPC-DS settings. The results show that the optimized
configuration can improve the TPC-DS performance about 30%. Though these
settings may not the best, they really help compared to the default value.
I attached some settings in this may, I guess some Flink batch users may be
interested in this. Based on my limited knowledge, I guess that increasing
the total TaskManager size and network memory size is important for
performance, because more memory (managed and network) can make operators
and shuffle faster.

Best,
Yingjie



Yingjie Cao  于2021年12月15日周三 12:19写道:

> Hi Till,
>
> Thanks for the suggestion. I think it makes a lot of sense to also extend
> the documentation for the sort shuffle to include a tuning guide.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年12月14日周二 18:57写道:
>
>> As part of this FLIP, does it make sense to also extend the documentation
>> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
>> in depth description of what things you might observe and how to influence
>> them with the configuration options.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
>> wrote:
>>
>>> Hi Yingjie,

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-14 文章 Yingjie Cao
Hi Till,

Thanks for the suggestion. I think it makes a lot of sense to also extend
the documentation for the sort shuffle to include a tuning guide.

Best,
Yingjie

Till Rohrmann  于2021年12月14日周二 18:57写道:

> As part of this FLIP, does it make sense to also extend the documentation
> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
> in depth description of what things you might observe and how to influence
> them with the configuration options.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>
> Cheers,
> Till
>
> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
> wrote:
>
>> Hi Yingjie,
>>
>> Thanks for your explanation. I have no more questions. +1
>>
>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao 
>> wrote:
>> >
>> > Hi Jingsong,
>> >
>> > Thanks for your feedback.
>> >
>> > >>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>> >
>> > Yes, you are right, these two options are related to network memory and
>> framework off-heap memory. Generally, these changes will not break out of
>> the box experience, but for some extreme cases, for example, there are too
>> many ResultPartitions per task, users may need to increase network memory
>> to avoid "insufficient network buffer" error. For framework off-head, I
>> believe that user do not need to change the default value.
>> >
>> > In fact, I have a basic goal when changing these config values: when
>> running TPCDS of medium parallelism with the default value, all queries
>> must pass without any error and at the same time, the performance can be
>> improved. I think if we achieve this goal, most common use cases can be
>> covered.
>> >
>> > Currently, for the default configuration, the exclusive buffers
>> required at input gate side is still parallelism relevant (though since
>> 1.14, we can decouple the network buffer consumption from parallelism by
>> setting a config value, it has slight performance influence on streaming
>> jobs), which means that no large parallelism can be supported by the
>> default configuration. Roughly, I would say the default value can support
>> jobs of several hundreds of parallelism.
>> >
>> > >>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>> >
>> > I think this is a good suggestion, we can provide those suggestions in
>> the document.
>> >
>> > Best,
>> > Yingjie
>> >
>> > Jingsong Li  于2021年12月14日周二 14:39写道:
>> >>
>> >> Hi  Yingjie,
>> >>
>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> >> of batch jobs.
>> >>
>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> >> network memory and framework.off-heap.size.
>> >>
>> >> My question is, what is the maximum parallelism a job can have with
>> >> the default configuration? (Does this break out of the box)
>> >>
>> >> How much network memory and framework.off-heap.size are required for
>> >> how much parallelism in the default configuration?
>> >>
>> >> I do feel that this correspondence is a bit difficult to control at
>> >> the moment, and it would be best if a rough table could be provided.
>> >>
>> >> Best,
>> >> Jingsong
>> >>
>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
>> wrote:
>> >> >
>> >> > Hi Jiangang,
>> >> >
>> >> > Thanks for your suggestion.
>> >> >
>> >> > >>> The config can affect the memory usage. Will the related memory
>> configs be changed?
>> >> >
>> >> > I think we will not change the default network memory settings. My
>> best expectation is that the default value can work for most cases (though
>> may not the best) and for other cases, user may need to tune the memory
>> settings.
>> >> >
>> >> > >>> Can you share the tpcds results for different configs? Although
>> we change the default values, it is helpful to change them for different
>> users. In t

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 文章 Yingjie Cao
Hi Jingsong,

Thanks for your feedback.

>>> My question is, what is the maximum parallelism a job can have with the
default configuration? (Does this break out of the box)

Yes, you are right, these two options are related to network memory and
framework off-heap memory. Generally, these changes will not break out of
the box experience, but for some extreme cases, for example, there are too
many ResultPartitions per task, users may need to increase network memory
to avoid "insufficient network buffer" error. For framework off-head, I
believe that user do not need to change the default value.

In fact, I have a basic goal when changing these config values: when
running TPCDS of medium parallelism with the default value, all queries
must pass without any error and at the same time, the performance can be
improved. I think if we achieve this goal, most common use cases can be
covered.

Currently, for the default configuration, the exclusive buffers required at
input gate side is still parallelism relevant (though since 1.14, we
can decouple the network buffer consumption from parallelism by setting a
config value, it has slight performance influence on streaming jobs), which
means that no large parallelism can be supported by the default
configuration. Roughly, I would say the default value can support jobs of
several hundreds of parallelism.

>>> I do feel that this correspondence is a bit difficult to control at the
moment, and it would be best if a rough table could be provided.

I think this is a good suggestion, we can provide those suggestions in the
document.

Best,
Yingjie

Jingsong Li  于2021年12月14日周二 14:39写道:

> Hi  Yingjie,
>
> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
> of batch jobs.
>
> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> network memory and framework.off-heap.size.
>
> My question is, what is the maximum parallelism a job can have with
> the default configuration? (Does this break out of the box)
>
> How much network memory and framework.off-heap.size are required for
> how much parallelism in the default configuration?
>
> I do feel that this correspondence is a bit difficult to control at
> the moment, and it would be best if a rough table could be provided.
>
> Best,
> Jingsong
>
> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
> wrote:
> >
> > Hi Jiangang,
> >
> > Thanks for your suggestion.
> >
> > >>> The config can affect the memory usage. Will the related memory
> configs be changed?
> >
> > I think we will not change the default network memory settings. My best
> expectation is that the default value can work for most cases (though may
> not the best) and for other cases, user may need to tune the memory
> settings.
> >
> > >>> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >
> > I did not keep all previous TPCDS results, but from the results, I can
> tell that on HDD, always using the sort-shuffle is a good choice. For small
> jobs, using sort-shuffle may not bring much performance gain, this may
> because that all shuffle data can be cached in memory (page cache), this is
> the case if the cluster have enough resources. However, if the whole
> cluster is under heavy burden or you are running large scale jobs, the
> performance of those small jobs can also be influenced. For large-scale
> jobs, the configurations suggested to be tuned are
> taskmanager.network.sort-shuffle.min-buffers and
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
> these values for large-scale batch jobs.
> >
> > BTW, I am still running TPCDS tests these days and I can share these
> results soon.
> >
> > Best,
> > Yingjie
> >
> > 刘建刚  于2021年12月10日周五 18:30写道:
> >>
> >> Glad to see the suggestion. In our test, we found that small jobs with
> the changing configs can not improve the performance much just as your
> test. I have some suggestions:
> >>
> >> The config can affect the memory usage. Will the related memory configs
> be changed?
> >> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >>
> >> Best,
> >> Liu Jiangang
> >>
> >> Yun Gao  于2021年12月10日周五 17:20写道:
> >>>
> >>> Hi Yingjie,
> >>>
> >>> Very thanks for draftin

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 文章 Yingjie Cao
Hi Jiangang,

Thanks for your suggestion.

>>> The config can affect the memory usage. Will the related memory configs
be changed?

I think we will not change the default network memory settings. My best
expectation is that the default value can work for most cases (though may
not the best) and for other cases, user may need to tune the memory
settings.

>>> Can you share the tpcds results for different configs? Although we
change the default values, it is helpful to change them for different
users. In this case, the experience can help a lot.

I did not keep all previous TPCDS results, but from the results, I can tell
that on HDD, always using the sort-shuffle is a good choice. For small
jobs, using sort-shuffle may not bring much performance gain, this may
because that all shuffle data can be cached in memory (page cache), this is
the case if the cluster have enough resources. However, if the whole
cluster is under heavy burden or you are running large scale jobs, the
performance of those small jobs can also be influenced. For large-scale
jobs, the configurations suggested to be tuned are
taskmanager.network.sort-shuffle.min-buffers and
taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
these values for large-scale batch jobs.

BTW, I am still running TPCDS tests these days and I can share these
results soon.

Best,
Yingjie

刘建刚  于2021年12月10日周五 18:30写道:

> Glad to see the suggestion. In our test, we found that small jobs with the
> changing configs can not improve the performance much just as your test. I
> have some suggestions:
>
>- The config can affect the memory usage. Will the related memory
>configs be changed?
>- Can you share the tpcds results for different configs? Although we
>change the default values, it is helpful to change them for different
>users. In this case, the experience can help a lot.
>
> Best,
> Liu Jiangang
>
> Yun Gao  于2021年12月10日周五 17:20写道:
>
>> Hi Yingjie,
>>
>> Very thanks for drafting the FLIP and initiating the discussion!
>>
>> May I have a double confirmation for
>> taskmanager.network.sort-shuffle.min-parallelism that
>> since other frameworks like Spark have used sort-based shuffle for all
>> the cases, does our
>> current circumstance still have difference with them?
>>
>> Best,
>> Yun
>>
>>
>>
>>
>> --
>> From:Yingjie Cao 
>> Send Time:2021 Dec. 10 (Fri.) 16:17
>> To:dev ; user ; user-zh <
>> user-zh@flink.apache.org>
>> Subject:Re: [DISCUSS] Change some default config values of blocking
>> shuffle
>>
>> Hi dev & users:
>>
>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>
>> Best,
>> Yingjie
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>>

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 文章 Yingjie Cao
Hi Yun,

Thanks for your feedback.

I think setting taskmanager.network.sort-shuffle.min-parallelism to 1 and
using sort-shuffle for all cases by default is a good suggestion. I am not
choosing this value mainly because two reasons:

1. The first one is that it increases the usage of network memory which may
cause "insufficient network buffer" exception and user may have to increase
the total network buffers.
2. There are several (not many) TPCDS queries suffers some performance
regression on SSD.

For the first issue, I will test more settings on tpcds and see the
influence. For the second issue, I will try to find the cause and solve it
in 1.15.

I am open for your suggestion, but I still need some more tests and
analysis to guarantee that it works well.

Best,
Yingjie

Yun Gao  于2021年12月10日周五 17:19写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for 
> taskmanager.network.sort-shuffle.min-parallelism
> that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user-zh@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>
> Yingjie Cao  于2021年12月3日周五 17:02写道:
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 文章 Yingjie Cao
Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability

Yingjie Cao  于2021年12月3日周五 17:02写道:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-06 文章 Yingjie Cao
Hi Till,

Thanks for your feedback.

>>> How will our tests be affected by these changes? Will Flink require
more resources and, thus, will it risk destabilizing our testing
infrastructure?

There are some tests that need to be adjusted, for example,
BlockingShuffleITCase. For other tests, theoretically, the influence should
be small. I will further run all tests multiple times (like 10 or 20) to
ensure that there is no test stability issues before making the change.

>>> I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Yes, you are right. I will prepare a simple FLIP soon.

Best,
Yingjie


Till Rohrmann  于2021年12月3日周五 18:39写道:

> Thanks for starting this discussion Yingjie,
>
> How will our tests be affected by these changes? Will Flink require more
> resources and, thus, will it risk destabilizing our testing infrastructure?
>
> I would propose to create a FLIP for these changes since you propose to
> change the default behaviour. It can be a very short one, though.
>
> Cheers,
> Till
>
> On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao 
> wrote:
>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>> value is '64' which means '64' network buffers (32k per buffer by default).
>> This default value is quite modest and the performance can be influenced.
>> We propose to increase this value to a larger one, for example, 512 (the
>> default TM and network buffer configuration can serve more than 10
>> result partitions concurrently).
>>
>> We already tested these default values together with tpc-ds benchmark in
>> a cluster and both the performance and stability improved a lot. These
>> changes can help to improve the out-of-box experience of blocking shuffle.
>> What do you think about these changes? Is there any concern? If there are
>> no objections, I will make these changes soon.
>>
>> Best,
>> Yingjie
>>
>


[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 文章 Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie


Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2021-12-01 文章 Yingjie Cao
这个是可以直接控制内部连边的方式,可以参考一下这个的Java doc。不过这个是一个内部接口,还是建议使用
env.setRuntimeMode(RuntimeExecutionMode.BATCH),这个可以参考一下这个文档:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
。

public enum GlobalStreamExchangeMode {
/** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
ALL_EDGES_BLOCKING,

/**
 * Set job edges with {@link ForwardPartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
FORWARD_EDGES_PIPELINED,

/**
 * Set job edges with {@link ForwardPartitioner} or {@link
RescalePartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
POINTWISE_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
ALL_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_APPROXIMATE}. */
ALL_EDGES_PIPELINED_APPROXIMATE
}


casel.chen  于2021年12月2日周四 上午8:26写道:

> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!
>
>
> package org.apache.flink.streaming.api.graph;
>
>
>
>
> import org.apache.flink.annotation.Internal;
>
>
>
>
> @Internal
>
> public enum GlobalStreamExchangeMode {
>
> ALL_EDGES_BLOCKING,
>
> FORWARD_EDGES_PIPELINED,
>
> POINTWISE_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED_APPROXIMATE;
>
>
>
>
> private GlobalStreamExchangeMode() {
>
> }
>
> }
>
>
>


Re: flink remote shuffle example运行出错

2021-12-01 文章 Yingjie Cao
看起来flink-streaming-java应该不在class
path下面,如果是官网下的flink,直接在FLINK_HOME下执行./bin/flink
run提交job应该就不会出错。另外这个错误我理解本身和remote shuffle应该关系不大,不用remote
shuffle,应该也会抛。建议是从官网下载一个Flink版本再试一下。


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 文章 Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 文章 Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 文章 Yingjie Cao
Maybe you can try to
increase taskmanager.network.retries,
taskmanager.network.netty.server.backlog and
taskmanager.network.netty.sendReceiveBufferSize. These options are useful
for our jobs.

yidan zhao  于2021年6月16日周三 下午7:10写道:

> Hi, yingjie.
> If the network is not stable, which config parameter I should adjust.
>
> yidan zhao  于2021年6月16日周三 下午6:56写道:
> >
> > 2: I use G1, and no full gc occurred, young gc count: 422, time:
> > 142892, so it is not bad.
> > 3: stream job.
> > 4: I will try to config taskmanager.network.retries which is default
> > 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> > is 120s。
> > 5: I checked the net fd number of the taskmanager, it is about 1000+,
> > so I think it is a reasonable value.
> >
> > 1: can not be sure.
> >
> > Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> > >
> > > Hi yidan,
> > >
> > > 1. Is the network stable?
> > > 2. Is there any GC problem?
> > > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more
> information.
> > > 4. You may try to config these two options:
> taskmanager.network.retries,
> taskmanager.network.netty.client.connectTimeoutSec. More relevant options
> can be found in 'Data Transport Network Stack' section of [2].
> > > 5. If it is not the above cases, it is may related to [3], you may
> need to check the number of tcp connection per TM and node.
> > >
> > > Hope this helps.
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> > > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> > > [3] https://issues.apache.org/jira/browse/FLINK-22643
> > >
> > > Best,
> > > Yingjie
> > >
> > > yidan zhao  于2021年6月16日周三 下午3:36写道:
> > >>
> > >> Attachment is the exception stack from flink's web-ui. Does anyone
> > >> have also met this problem?
> > >>
> > >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> > >> each 28G mem.
>


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 文章 Yingjie Cao
Hi yidan,

1. Is the network stable?
2. Is there any GC problem?
3. Is it a batch job? If so, please use sort-shuffle, see [1] for more
information.
4. You may try to config these two options: taskmanager.network.retries,
taskmanager.network.netty.client.connectTimeoutSec. More relevant options
can be found in 'Data Transport Network Stack' section of [2].
5. If it is not the above cases, it is may related to [3], you may need to
check the number of tcp connection per TM and node.

Hope this helps.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
[3] https://issues.apache.org/jira/browse/FLINK-22643

Best,
Yingjie

yidan zhao  于2021年6月16日周三 下午3:36写道:

> Attachment is the exception stack from flink's web-ui. Does anyone
> have also met this problem?
>
> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> each 28G mem.
>


Re: 相同的作业配置 ,Flink1.12 版本的作业checkpoint耗时增加以及制作失败,Flink1.9的作业运行正常

2021-03-30 文章 Yingjie Cao
这个应该不是FLINK-16404
的影响,那个对checkpoint时间的影响比较小,是已经有一个benchmark测试的,1s的checkpoint
interval也没什么大问题,我建议可以看一下失败的task的stack,看一下在干什么,可能排查问题更快一些。

Haihang Jing  于2021年3月24日周三 下午12:06写道:

> 【现象】相同配置的作业(checkpoint interval :3分钟,作业逻辑:regular
> join),flink1.9运行正常,flink1.12运行一段时间后,checkpoint制作耗时增大,最后checkpoint制作失败。
>
>
> 【分析】了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量,因此调整checkpoint
> interval为10分钟进行对比测试,发现调整后(interval为10),flink1.12上运行的作业运行正常。
> 相关issue:https://issues.apache.org/jira/browse/FLINK-16404
>
> 【问题】1.想咨询下大家有遇到过相同的情况么?
> 2.flink1.12的作业checkpoint间隔对作业的影响具体有多大?官方有测试么?
>
> checkpoint interval为3分钟的flink1.12作业运行5小时后,checkpoint制作失败,具体异常栈:
>
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>