Thanks for the experiment. +1 for the changes.

Yingjie Cao <kevin.ying...@gmail.com> 于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao <kevin.ying...@gmail.com> 于2021年12月15日周三 12:19写道:
>
>> Hi Till,
>>
>> Thanks for the suggestion. I think it makes a lot of sense to also extend
>> the documentation for the sort shuffle to include a tuning guide.
>>
>> Best,
>> Yingjie
>>
>> Till Rohrmann <trohrm...@apache.org> 于2021年12月14日周二 18:57写道:
>>
>>> As part of this FLIP, does it make sense to also extend the
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao <kevin.ying...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li <jingsongl...@gmail.com> 于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao <kevin.ying...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change the default network memory settings. My
>>>> best expectation is that the default value can work for most cases (though
>>>> may not the best) and for other cases, user may need to tune the memory
>>>> settings.
>>>> >> >
>>>> >> > >>> Can you share the tpcds results for different configs?
>>>> Although we change the default values, it is helpful to change them for
>>>> different users. In this case, the experience can help a lot.
>>>> >> >
>>>> >> > I did not keep all previous TPCDS results, but from the results, I
>>>> can tell that on HDD, always using the sort-shuffle is a good choice. For
>>>> small jobs, using sort-shuffle may not bring much performance gain, this
>>>> may because that all shuffle data can be cached in memory (page cache),
>>>> this is the case if the cluster have enough resources. However, if the
>>>> whole cluster is under heavy burden or you are running large scale jobs,
>>>> the performance of those small jobs can also be influenced. For large-scale
>>>> jobs, the configurations suggested to be tuned are
>>>> taskmanager.network.sort-shuffle.min-buffers and
>>>> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
>>>> these values for large-scale batch jobs.
>>>> >> >
>>>> >> > BTW, I am still running TPCDS tests these days and I can share
>>>> these results soon.
>>>> >> >
>>>> >> > Best,
>>>> >> > Yingjie
>>>> >> >
>>>> >> > 刘建刚 <liujiangangp...@gmail.com> 于2021年12月10日周五 18:30写道:
>>>> >> >>
>>>> >> >> Glad to see the suggestion. In our test, we found that small jobs
>>>> with the changing configs can not improve the performance much just as your
>>>> test. I have some suggestions:
>>>> >> >>
>>>> >> >> The config can affect the memory usage. Will the related memory
>>>> configs be changed?
>>>> >> >> Can you share the tpcds results for different configs? Although
>>>> we change the default values, it is helpful to change them for different
>>>> users. In this case, the experience can help a lot.
>>>> >> >>
>>>> >> >> Best,
>>>> >> >> Liu Jiangang
>>>> >> >>
>>>> >> >> Yun Gao <yungao...@aliyun.com.invalid> 于2021年12月10日周五 17:20写道:
>>>> >> >>>
>>>> >> >>> Hi Yingjie,
>>>> >> >>>
>>>> >> >>> Very thanks for drafting the FLIP and initiating the discussion!
>>>> >> >>>
>>>> >> >>> May I have a double confirmation for
>>>> taskmanager.network.sort-shuffle.min-parallelism that
>>>> >> >>> since other frameworks like Spark have used sort-based shuffle
>>>> for all the cases, does our
>>>> >> >>> current circumstance still have difference with them?
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yun
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> ------------------------------------------------------------------
>>>> >> >>> From:Yingjie Cao <kevin.ying...@gmail.com>
>>>> >> >>> Send Time:2021 Dec. 10 (Fri.) 16:17
>>>> >> >>> To:dev <d...@flink.apache.org>; user <u...@flink.apache.org>;
>>>> user-zh <user-zh@flink.apache.org>
>>>> >> >>> Subject:Re: [DISCUSS] Change some default config values of
>>>> blocking shuffle
>>>> >> >>>
>>>> >> >>> Hi dev & users:
>>>> >> >>>
>>>> >> >>> I have created a FLIP [1] for it, feedbacks are highly
>>>> appreciated.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >> >>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>>>> >> >>> Yingjie Cao <kevin.ying...@gmail.com> 于2021年12月3日周五 17:02写道:
>>>> >> >>>
>>>> >> >>> Hi dev & users,
>>>> >> >>>
>>>> >> >>> We propose to change some default values of blocking shuffle to
>>>> improve the user out-of-box experience (not influence streaming). The
>>>> default values we want to change are as follows:
>>>> >> >>>
>>>> >> >>> 1. Data compression
>>>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>>>> default value is 'false'.  Usually, data compression can reduce both disk
>>>> and network IO which is good for performance. At the same time, it can save
>>>> storage space. We propose to change the default value to true.
>>>> >> >>>
>>>> >> >>> 2. Default shuffle implementation
>>>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>>>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>>>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>>>> both stability and performance. So we propose to reduce the default value
>>>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>>>> 1024 with a tpc-ds and 128 is the best one.)
>>>> >> >>>
>>>> >> >>> 3. Read buffer of sort-shuffle
>>>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>>>> default value is '32M'. Previously, when choosing the default value, both
>>>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>>>> way. However, recently, it is reported in the mailing list that the default
>>>> value is not enough which caused a buffer request timeout issue. We already
>>>> created a ticket to improve the behavior. At the same time, we propose to
>>>> increase this default value to '64M' which can also help.
>>>> >> >>>
>>>> >> >>> 4. Sort buffer size of sort-shuffle
>>>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>>>> value is '64' which means '64' network buffers (32k per buffer by default).
>>>> This default value is quite modest and the performance can be influenced.
>>>> We propose to increase this value to a larger one, for example, 512 (the
>>>> default TM and network buffer configuration can serve more than 10 result
>>>> partitions concurrently).
>>>> >> >>>
>>>> >> >>> We already tested these default values together with tpc-ds
>>>> benchmark in a cluster and both the performance and stability improved a
>>>> lot. These changes can help to improve the out-of-box experience of
>>>> blocking shuffle. What do you think about these changes? Is there any
>>>> concern? If there are no objections, I will make these changes soon.
>>>> >> >>>
>>>> >> >>> Best,
>>>> >> >>> Yingjie
>>>> >> >>>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best, Jingsong Lee
>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>

回复