Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-06 Thread Yingjie Cao
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao  于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> --
> From:刘建刚 
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh 
> Cc:dev ; user 
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao  于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stabili

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-04 Thread Yingjie Cao
Hi all,

After running some tests with the proposed default value (
taskmanager.network.sort-shuffle.min-parallelism: 1,
taskmanager.network.sort-shuffle.min-buffers: 512,
taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
share some test results.

1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
default parallelism and several different settings multiple times):
1) Stability:
Compared to the current default values, the proposed default values can
improve the TPC-DS query stability a lot. With the current default, there
are many queries suffering from blocking shuffle relevant failures. With
the proposed default values, only three queries fail because of the
"Insufficient number of network buffers:" error. With 512 parallelism, the
current default configuration will incur the same issue. Part of the reason
is that the network buffer consumed by InputGate is  proportional to
parallelism and can use 32M network memory by default and many tasks has
several InputGate but we only has 128M network memory per TaskManager by
default.
2) Performance:
Compared to the current default values, the proposed default values can
improve the TPC-DS query performance a lot. Except for those queries of
small shuffle data amount which consume really short time, the proposed
default values can bring 2-10 times performance gain. About the default
value of taskmanager.network.sort-shuffle.min-parallelism  proposed by Yun,
I tested both 1 and 128 and 1 is better for performance which is as
expected.

2. Flink pre-commit stability test:
I have run all Flink tests with the proposed default value for more than 20
times. The only instability is the "Insufficient number of network
buffers:" error for batch several test cases. This error occurs because
some tests have really limited network buffers and the proposed default
config values may increase the network buffer consumption for cases. After
increase the total network size for these test cases, the issue can be
solved.

Summary:
1. The proposed default value can improve both the performance and
stability of Flink batch shuffle a lot.
2. Some batch jobs may fail because of the "Insufficient number of network
buffers:" error for this default value change will increase the network
buffer consumption a little for jobs less than 512 parallelism (for jobs
more than 512 parallelism network buffer consumption will be reduced).
3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has better
performance than setting that to 128, both settings may incur the
"Insufficient number of network buffers:" error.
4. After changing the default value and fixing several test cases, all
Flink tests (except for those known unstable cases) can run stably.

Personally, I am +1 to make the change. Though the change may cause some
batch jobs fail because of the "Insufficient number of network buffers:",
the possibility is small enough (only 3 TPC-DS out of about 100 queries
fails, these queries will also fail with the current default configuration
because it is the InputGate which takes the most network buffers and cost
the error). Compared to this small regression, the performance and
stability gains are big. Any feedbacks especially those from Flink batch
users are highly appreciated.

BTW, aside from the above tests, I also tries to tune some more config
options to try to make the TPC-DS test faster. I copied these tuned config
options from our daily TPC-DS settings. The results show that the optimized
configuration can improve the TPC-DS performance about 30%. Though these
settings may not the best, they really help compared to the default value.
I attached some settings in this may, I guess some Flink batch users may be
interested in this. Based on my limited knowledge, I guess that increasing
the total TaskManager size and network memory size is important for
performance, because more memory (managed and network) can make operators
and shuffle faster.

Best,
Yingjie



Yingjie Cao  于2021年12月15日周三 12:19写道:

> Hi Till,
>
> Thanks for the suggestion. I think it makes a lot of sense to also extend
> the documentation for the sort shuffle to include a tuning guide.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年12月14日周二 18:57写道:
>
>> As part of this FLIP, does it make sense to also extend the documentation
>> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
>> in depth description of what things you might observe and how to influence
>> them with the configuration options.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
>> wrote:
>>
>>> Hi Yingjie,

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-14 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. I think it makes a lot of sense to also extend
the documentation for the sort shuffle to include a tuning guide.

Best,
Yingjie

Till Rohrmann  于2021年12月14日周二 18:57写道:

> As part of this FLIP, does it make sense to also extend the documentation
> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
> in depth description of what things you might observe and how to influence
> them with the configuration options.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>
> Cheers,
> Till
>
> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
> wrote:
>
>> Hi Yingjie,
>>
>> Thanks for your explanation. I have no more questions. +1
>>
>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao 
>> wrote:
>> >
>> > Hi Jingsong,
>> >
>> > Thanks for your feedback.
>> >
>> > >>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>> >
>> > Yes, you are right, these two options are related to network memory and
>> framework off-heap memory. Generally, these changes will not break out of
>> the box experience, but for some extreme cases, for example, there are too
>> many ResultPartitions per task, users may need to increase network memory
>> to avoid "insufficient network buffer" error. For framework off-head, I
>> believe that user do not need to change the default value.
>> >
>> > In fact, I have a basic goal when changing these config values: when
>> running TPCDS of medium parallelism with the default value, all queries
>> must pass without any error and at the same time, the performance can be
>> improved. I think if we achieve this goal, most common use cases can be
>> covered.
>> >
>> > Currently, for the default configuration, the exclusive buffers
>> required at input gate side is still parallelism relevant (though since
>> 1.14, we can decouple the network buffer consumption from parallelism by
>> setting a config value, it has slight performance influence on streaming
>> jobs), which means that no large parallelism can be supported by the
>> default configuration. Roughly, I would say the default value can support
>> jobs of several hundreds of parallelism.
>> >
>> > >>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>> >
>> > I think this is a good suggestion, we can provide those suggestions in
>> the document.
>> >
>> > Best,
>> > Yingjie
>> >
>> > Jingsong Li  于2021年12月14日周二 14:39写道:
>> >>
>> >> Hi  Yingjie,
>> >>
>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> >> of batch jobs.
>> >>
>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> >> network memory and framework.off-heap.size.
>> >>
>> >> My question is, what is the maximum parallelism a job can have with
>> >> the default configuration? (Does this break out of the box)
>> >>
>> >> How much network memory and framework.off-heap.size are required for
>> >> how much parallelism in the default configuration?
>> >>
>> >> I do feel that this correspondence is a bit difficult to control at
>> >> the moment, and it would be best if a rough table could be provided.
>> >>
>> >> Best,
>> >> Jingsong
>> >>
>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
>> wrote:
>> >> >
>> >> > Hi Jiangang,
>> >> >
>> >> > Thanks for your suggestion.
>> >> >
>> >> > >>> The config can affect the memory usage. Will the related memory
>> configs be changed?
>> >> >
>> >> > I think we will not change the default network memory settings. My
>> best expectation is that the default value can work for most cases (though
>> may not the best) and for other cases, user may need to tune the memory
>> settings.
>> >> >
>> >> > >>> Can you share the tpcds results for different configs? Although
>> we change the default values, it is helpful to change them for different
>> users.

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Jingsong,

Thanks for your feedback.

>>> My question is, what is the maximum parallelism a job can have with the
default configuration? (Does this break out of the box)

Yes, you are right, these two options are related to network memory and
framework off-heap memory. Generally, these changes will not break out of
the box experience, but for some extreme cases, for example, there are too
many ResultPartitions per task, users may need to increase network memory
to avoid "insufficient network buffer" error. For framework off-head, I
believe that user do not need to change the default value.

In fact, I have a basic goal when changing these config values: when
running TPCDS of medium parallelism with the default value, all queries
must pass without any error and at the same time, the performance can be
improved. I think if we achieve this goal, most common use cases can be
covered.

Currently, for the default configuration, the exclusive buffers required at
input gate side is still parallelism relevant (though since 1.14, we
can decouple the network buffer consumption from parallelism by setting a
config value, it has slight performance influence on streaming jobs), which
means that no large parallelism can be supported by the default
configuration. Roughly, I would say the default value can support jobs of
several hundreds of parallelism.

>>> I do feel that this correspondence is a bit difficult to control at the
moment, and it would be best if a rough table could be provided.

I think this is a good suggestion, we can provide those suggestions in the
document.

Best,
Yingjie

Jingsong Li  于2021年12月14日周二 14:39写道:

> Hi  Yingjie,
>
> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
> of batch jobs.
>
> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> network memory and framework.off-heap.size.
>
> My question is, what is the maximum parallelism a job can have with
> the default configuration? (Does this break out of the box)
>
> How much network memory and framework.off-heap.size are required for
> how much parallelism in the default configuration?
>
> I do feel that this correspondence is a bit difficult to control at
> the moment, and it would be best if a rough table could be provided.
>
> Best,
> Jingsong
>
> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
> wrote:
> >
> > Hi Jiangang,
> >
> > Thanks for your suggestion.
> >
> > >>> The config can affect the memory usage. Will the related memory
> configs be changed?
> >
> > I think we will not change the default network memory settings. My best
> expectation is that the default value can work for most cases (though may
> not the best) and for other cases, user may need to tune the memory
> settings.
> >
> > >>> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >
> > I did not keep all previous TPCDS results, but from the results, I can
> tell that on HDD, always using the sort-shuffle is a good choice. For small
> jobs, using sort-shuffle may not bring much performance gain, this may
> because that all shuffle data can be cached in memory (page cache), this is
> the case if the cluster have enough resources. However, if the whole
> cluster is under heavy burden or you are running large scale jobs, the
> performance of those small jobs can also be influenced. For large-scale
> jobs, the configurations suggested to be tuned are
> taskmanager.network.sort-shuffle.min-buffers and
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
> these values for large-scale batch jobs.
> >
> > BTW, I am still running TPCDS tests these days and I can share these
> results soon.
> >
> > Best,
> > Yingjie
> >
> > 刘建刚  于2021年12月10日周五 18:30写道:
> >>
> >> Glad to see the suggestion. In our test, we found that small jobs with
> the changing configs can not improve the performance much just as your
> test. I have some suggestions:
> >>
> >> The config can affect the memory usage. Will the related memory configs
> be changed?
> >> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >>
> >> Best,
> >> Liu Jiangang
> >>
> >> Yun Gao  于2021年12月10日周五 17:20写道:
> >>>
> >>> Hi Yingjie,
> >>>
> >>> Very thanks for dr

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Jiangang,

Thanks for your suggestion.

>>> The config can affect the memory usage. Will the related memory configs
be changed?

I think we will not change the default network memory settings. My best
expectation is that the default value can work for most cases (though may
not the best) and for other cases, user may need to tune the memory
settings.

>>> Can you share the tpcds results for different configs? Although we
change the default values, it is helpful to change them for different
users. In this case, the experience can help a lot.

I did not keep all previous TPCDS results, but from the results, I can tell
that on HDD, always using the sort-shuffle is a good choice. For small
jobs, using sort-shuffle may not bring much performance gain, this may
because that all shuffle data can be cached in memory (page cache), this is
the case if the cluster have enough resources. However, if the whole
cluster is under heavy burden or you are running large scale jobs, the
performance of those small jobs can also be influenced. For large-scale
jobs, the configurations suggested to be tuned are
taskmanager.network.sort-shuffle.min-buffers and
taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
these values for large-scale batch jobs.

BTW, I am still running TPCDS tests these days and I can share these
results soon.

Best,
Yingjie

刘建刚  于2021年12月10日周五 18:30写道:

> Glad to see the suggestion. In our test, we found that small jobs with the
> changing configs can not improve the performance much just as your test. I
> have some suggestions:
>
>- The config can affect the memory usage. Will the related memory
>configs be changed?
>- Can you share the tpcds results for different configs? Although we
>change the default values, it is helpful to change them for different
>users. In this case, the experience can help a lot.
>
> Best,
> Liu Jiangang
>
> Yun Gao  于2021年12月10日周五 17:20写道:
>
>> Hi Yingjie,
>>
>> Very thanks for drafting the FLIP and initiating the discussion!
>>
>> May I have a double confirmation for
>> taskmanager.network.sort-shuffle.min-parallelism that
>> since other frameworks like Spark have used sort-based shuffle for all
>> the cases, does our
>> current circumstance still have difference with them?
>>
>> Best,
>> Yun
>>
>>
>>
>>
>> --
>> From:Yingjie Cao 
>> Send Time:2021 Dec. 10 (Fri.) 16:17
>> To:dev ; user ; user-zh <
>> user...@flink.apache.org>
>> Subject:Re: [DISCUSS] Change some default config values of blocking
>> shuffle
>>
>> Hi dev & users:
>>
>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>
>> Best,
>> Yingjie
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>&g

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Yun,

Thanks for your feedback.

I think setting taskmanager.network.sort-shuffle.min-parallelism to 1 and
using sort-shuffle for all cases by default is a good suggestion. I am not
choosing this value mainly because two reasons:

1. The first one is that it increases the usage of network memory which may
cause "insufficient network buffer" exception and user may have to increase
the total network buffers.
2. There are several (not many) TPCDS queries suffers some performance
regression on SSD.

For the first issue, I will test more settings on tpcds and see the
influence. For the second issue, I will try to find the cause and solve it
in 1.15.

I am open for your suggestion, but I still need some more tests and
analysis to guarantee that it works well.

Best,
Yingjie

Yun Gao  于2021年12月10日周五 17:19写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for 
> taskmanager.network.sort-shuffle.min-parallelism
> that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>
> Yingjie Cao  于2021年12月3日周五 17:02写道:
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yingjie Cao
Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability

Yingjie Cao  于2021年12月3日周五 17:02写道:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-06 Thread Yingjie Cao
Hi Till,

Thanks for your feedback.

>>> How will our tests be affected by these changes? Will Flink require
more resources and, thus, will it risk destabilizing our testing
infrastructure?

There are some tests that need to be adjusted, for example,
BlockingShuffleITCase. For other tests, theoretically, the influence should
be small. I will further run all tests multiple times (like 10 or 20) to
ensure that there is no test stability issues before making the change.

>>> I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Yes, you are right. I will prepare a simple FLIP soon.

Best,
Yingjie


Till Rohrmann  于2021年12月3日周五 18:39写道:

> Thanks for starting this discussion Yingjie,
>
> How will our tests be affected by these changes? Will Flink require more
> resources and, thus, will it risk destabilizing our testing infrastructure?
>
> I would propose to create a FLIP for these changes since you propose to
> change the default behaviour. It can be a very short one, though.
>
> Cheers,
> Till
>
> On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao 
> wrote:
>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>> value is '64' which means '64' network buffers (32k per buffer by default).
>> This default value is quite modest and the performance can be influenced.
>> We propose to increase this value to a larger one, for example, 512 (the
>> default TM and network buffer configuration can serve more than 10
>> result partitions concurrently).
>>
>> We already tested these default values together with tpc-ds benchmark in
>> a cluster and both the performance and stability improved a lot. These
>> changes can help to improve the out-of-box experience of blocking shuffle.
>> What do you think about these changes? Is there any concern? If there are
>> no objections, I will make these changes soon.
>>
>> Best,
>> Yingjie
>>
>


[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 Thread Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


[DISCUSS] FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-07-13 Thread Yingjie Cao
Hi devs and users,

This topic was originally discussed and reached a consensus in [1]. Because
the change touches the pluggable shuffle interface, though not annotated as
public currently, some users may be using it already. To avoid bring
compatibility issues to customized shuffle plugins already implemented. I
am writing a FLIP [2] which contains only the minimal changes and involving
the user mailing list. If there is no objections, I will start a voting
soon.

[1]
https://lists.apache.org/thread.html/radbbabfcfb6bec305ddf7aeefb983232f96b18ba013f0ae2ee500288%40%3Cdev.flink.apache.org%3E
.
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
.

Best,
Yingjie


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread Yingjie Cao
Maybe you can try to
increase taskmanager.network.retries,
taskmanager.network.netty.server.backlog and
taskmanager.network.netty.sendReceiveBufferSize. These options are useful
for our jobs.

yidan zhao  于2021年6月16日周三 下午7:10写道:

> Hi, yingjie.
> If the network is not stable, which config parameter I should adjust.
>
> yidan zhao  于2021年6月16日周三 下午6:56写道:
> >
> > 2: I use G1, and no full gc occurred, young gc count: 422, time:
> > 142892, so it is not bad.
> > 3: stream job.
> > 4: I will try to config taskmanager.network.retries which is default
> > 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> > is 120s。
> > 5: I checked the net fd number of the taskmanager, it is about 1000+,
> > so I think it is a reasonable value.
> >
> > 1: can not be sure.
> >
> > Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> > >
> > > Hi yidan,
> > >
> > > 1. Is the network stable?
> > > 2. Is there any GC problem?
> > > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more
> information.
> > > 4. You may try to config these two options:
> taskmanager.network.retries,
> taskmanager.network.netty.client.connectTimeoutSec. More relevant options
> can be found in 'Data Transport Network Stack' section of [2].
> > > 5. If it is not the above cases, it is may related to [3], you may
> need to check the number of tcp connection per TM and node.
> > >
> > > Hope this helps.
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> > > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> > > [3] https://issues.apache.org/jira/browse/FLINK-22643
> > >
> > > Best,
> > > Yingjie
> > >
> > > yidan zhao  于2021年6月16日周三 下午3:36写道:
> > >>
> > >> Attachment is the exception stack from flink's web-ui. Does anyone
> > >> have also met this problem?
> > >>
> > >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> > >> each 28G mem.
>


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread Yingjie Cao
Hi yidan,

1. Is the network stable?
2. Is there any GC problem?
3. Is it a batch job? If so, please use sort-shuffle, see [1] for more
information.
4. You may try to config these two options: taskmanager.network.retries,
taskmanager.network.netty.client.connectTimeoutSec. More relevant options
can be found in 'Data Transport Network Stack' section of [2].
5. If it is not the above cases, it is may related to [3], you may need to
check the number of tcp connection per TM and node.

Hope this helps.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
[3] https://issues.apache.org/jira/browse/FLINK-22643

Best,
Yingjie

yidan zhao  于2021年6月16日周三 下午3:36写道:

> Attachment is the exception stack from flink's web-ui. Does anyone
> have also met this problem?
>
> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> each 28G mem.
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann  于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>current hash-based blocking shuffle implementation writes too many
>> files
>> >>concurrently which gives high pressure to the file system, for
>> example,
>> >>maintenance of too many file metas, exhaustion of inodes or file
>> >>descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>based blocking shuffle don’t have the problem because for one result
>> >>partition, only one file is written at the same time.
>> >>2. *Performance*: Large amounts of small shuffle files and random IO
>> >>can influence shuffle performance a lot especially for hdd (for ssd,
>> >>sequential read is also important because of read ahead and cache).
>> For
>> >>batch jobs processing massive data, small amount of data per
>> subpartition
>> >>is common because of high parallelism. Besides, data skew is
>> another cause
>> >>of small subpartition files. By merging data of all subpartitions
>> together
>> >>in one file, more sequential read can be achieved.
>> >>3. *Resource*: For current hash-based implementation, each
>> >>subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>memory consumption can be huge. For example, we need at least 320M
>> network
>> >>memory per result partition if parallelism is set to 1 and
>> because of
>> >>the huge network consumption, it is hard to config the network
>> memory for
>> >>large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao  于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO
>can influence shuffle performance a lot especially for hdd (for ssd,
>sequential read is also important because of read ahead and cache). For
>batch jobs processing massive data, small amount of data per subpartition
>is common because of high parallelism. Besides, data skew is another cause
>of small subpartition files. By merging data of all subpartitions together
>in one file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each
>subpartition needs at least one buffer. For large scale batch shuffles, the
>memory consumption can be huge. For example, we need at least 320M network
>memory per result partition if parallelism is set to 1 and because of
>the huge network consumption, it is hard to config the network memory for
>large scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

After scanning the user mailing list, I found some users have reported
checkpoint timeout when using unaligned checkpoint, can you share which
checkpoint mode do you use? (The information can be found in log or the
checkpoint -> configuration tab in webui)

Best,
Yingjie

Yingjie Cao  于2021年3月30日周二 下午4:29写道:

> Hi Haihang,
>
> I think your issue is not related to FLINK-16404
> <https://issues.apache.org/jira/browse/FLINK-16404>, because that change
> should have small impact on checkpoint time, we already have a micro
> benchmark for that change (1s checkpoint interval) and no regression is
> seen.
>
> Could you share some more information, for example, the stack of the task
> which can not finish the checkpoint?
>
> Best,
> Yingjie
>
> Haihang Jing  于2021年3月25日周四 上午10:58写道:
>
>> Hi,Congxian ,thanks for your replay.
>> job run on Flink1.9 (checkpoint interval 3min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>>
>> job run on Flink1.12 (checkpoint interval 10min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>>
>> job run on Flink1.12 (checkpoint interval 3min)
>> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>>
>> Pic2:Start delay(4m27s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>>
>> Pic3:Next checkpoint failed(task141 ack n/a):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>>
>> Pic4:Did not see back pressure and data skew:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>>
>> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>>
>> Best,
>> Haihang
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

I think your issue is not related to FLINK-16404
, because that change
should have small impact on checkpoint time, we already have a micro
benchmark for that change (1s checkpoint interval) and no regression is
seen.

Could you share some more information, for example, the stack of the task
which can not finish the checkpoint?

Best,
Yingjie

Haihang Jing  于2021年3月25日周四 上午10:58写道:

> Hi,Congxian ,thanks for your replay.
> job run on Flink1.9 (checkpoint interval 3min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>
> job run on Flink1.12 (checkpoint interval 10min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>
> job run on Flink1.12 (checkpoint interval 3min)
> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>
> Pic2:Start delay(4m27s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>
> Pic3:Next checkpoint failed(task141 ack n/a):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>
> Pic4:Did not see back pressure and data skew:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>
> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>
> Best,
> Haihang
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: What happens to the channels when there is backpressure?

2019-11-27 Thread yingjie cao
Hi Felipe,

That depends on what do you mean by 'bandwidth'. If you mean the capability
of the network stack, the answer would be no.

Here is a post about Flink network stack which may help:
https://flink.apache.org/2019/06/05/flink-network-stack.html.

Thanks,
Yingjie

Felipe Gutierrez  于2019年11月27日周三 下午11:13写道:

> Hi community,
>
> I have a question about backpressure. Suppose a scenario that I have a map
> and a reducer, and the reducer is back pressuring the map operator. I know
> that the reducer is processing tuples at a lower rate than it is receiving.
>
> However, can I say that at least one channel between the map and the
> reducer is totally using its available bandwidth?
>
> My guess is it is not, at least in the beginning. But as the time goes on
> the tuples will be queued in the network buffer of the reducer and then the
> bandwidth will be 100% of usage. Am I right?
>
> Thanks,
> Felipe
>
>
>