This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ceda3da20ec5ed50849d89a629744fe9d4010d8
Author: Weijie Guo <res...@163.com>
AuthorDate: Wed Feb 8 15:25:40 2023 +0800

    [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch 
scheduler.
    
    This closes #21890
---
 docs/content.zh/docs/deployment/elastic_scaling.md |  4 ++--
 docs/content.zh/docs/ops/batch/batch_shuffle.md    | 16 +++++++++++++++-
 docs/content/docs/deployment/elastic_scaling.md    |  4 ++--
 docs/content/docs/ops/batch/batch_shuffle.md       | 16 +++++++++++++++-
 4 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md 
b/docs/content.zh/docs/deployment/elastic_scaling.md
index 1e7409fb989..c65c974a35b 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量
 - SQL作业中的算子也可以分配不同的并行度
 
-当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 。
+当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 
`ALL_EXCHANGES_HYBRID_SELECTIVE`。
 
 ### 自动推导并发度
 
@@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 
 ### 局限性
 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
-- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle 
mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 
ALL_EXCHANGES_BLOCKING 的作业。
+- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 
[shuffle mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 
ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。
 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 
`StreamExecutionEnvironment#readFile(...)` 
`StreamExecutionEnvironment#readTextFile(...)` 和 
`StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive 
Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< 
ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL 
Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
 - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 
自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 
[FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。
 
diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md 
b/docs/content.zh/docs/ops/batch/batch_shuffle.md
index 9ec955ce27c..fda29f75a01 100644
--- a/docs/content.zh/docs/ops/batch/batch_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies:
 
 To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
+#### Data Consumption Constraints
+
+Hybrid shuffle divides the partition data consumption constraints between 
producer and consumer into the following three cases:
+
+- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when 
all producers are finished.
+- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from 
finished producers.
+- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished 
producers.
+
+These could be configured via 
[jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref 
"docs/deployment/config" 
>}}#jobmanager-partition-hybrid-partition-data-consume-constraint).
+
+- **For `AdaptiveBatchScheduler`** : The default constraint is 
`UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set 
to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be 
degraded.
+- **If `SpeculativeExecution` is enabled** : The default constraint is 
`ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with 
blocking shuffle. Since producers and consumers have the opportunity to run at 
the same time, more speculative execution tasks may be created, and the cost of 
failover will also increase. If you want to fall back to the same behavior as 
blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It 
is also important to note  [...]
+
 ### Limitations
 
 Hybrid shuffle mode is still experimental and has some known limitations, 
which the Flink community is still working on eliminating.
 
 - **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently 
forces each task to be executed in a dedicated slot exclusively. If slot 
sharing is explicitly specified, an error will occur.
-- **No support for Adaptive Batch Scheduler and Speculative Execution.** If 
adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
+- **No pipelined execution for dynamic graph.** If auto-parallelism (dynamic 
graph) is enabled, Adaptive Batch Scheduler will wait until upstream tasks 
finish to decide parallelism of downstream tasks, which means hybrid shuffle 
effectively fallback to blocking shuffle (`ALL_PRODUCERS_FINISHED` constraint).
 
 ## 性能调优
 
@@ -140,6 +153,7 @@ Hybrid shuffle mode is still experimental and has some 
known limitations, which
 1. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将 [网络内存比例]({{< ref 
"docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少 
0.2。为了使调整生效,你可能需要同时调整 [网络内存大小下界]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-min) 以及 [网络内存大小上界]({{< ref 
"docs/deployment/config" >}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个 
[内存配置文档]({{< ref "docs/deployment/memory/mem_setup_tm" >}})。
 2. 增大数据写出内存。对于大规模作业, 建议增大总内存大小,用于数据写入的内存越大, 下游越有机会直接从内存读取数据. 你需要保证每个 `Result 
Partition` 至少能够分配到 `numSubpartition + 1` 个buffer, 否则可能会遇到 "Insufficient number 
of network buffers" 错误。
 3. 增大数据读取内存。对于大规模作业,建议增大 [数据读取内存]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M 
或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此你必须增加相同的内存大小到 
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
+4. 当使用 `Hybrid Shuffle` 时, 减少 [独占网络缓冲区]({{< ref "docs/deployment/config" 
>}}#taskmanager-network-memory-buffers-per-channel) 可能会严重影响性能. 因此,最好不要将改值设置为 
`0`, 并且对于大规模作业可以适当增加该值. 同样需要注意的是: hybrid shuffle 默认会将 
[taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-network-memory-read-buffer-required-per-gate-max) 设置为 
`Integer.MAX_VALUE`. 最好不要去调整该配置,否则可能会造成性能的下降.
 {{< /tab >}}
 
 {{< /tabs >}}
diff --git a/docs/content/docs/deployment/elastic_scaling.md 
b/docs/content/docs/deployment/elastic_scaling.md
index e22ce9a583e..b11e459f294 100644
--- a/docs/content/docs/deployment/elastic_scaling.md
+++ b/docs/content/docs/deployment/elastic_scaling.md
@@ -158,7 +158,7 @@ The Adaptive Batch Scheduler is a batch job scheduler that 
can automatically adj
 - Operators from SQL batch jobs can be assigned with different parallelisms 
which are automatically tuned
 
 At present, the Adaptive Batch Scheduler is the default scheduler for Flink 
batch jobs. No additional configuration is required unless other schedulers are 
explicitly configured, e.g. `jobmanager.scheduler: default`. Note that you need 
to
-leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) unset or explicitly set it to 
`ALL_EXCHANGES_BLOCKING` (default value) due to ["ALL_EXCHANGES_BLOCKING jobs 
only"](#limitations-2).
+leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) unset or explicitly set it to 
`ALL_EXCHANGES_BLOCKING` (default value) or `ALL_EXCHANGES_HYBRID_FULL` or 
`ALL_EXCHANGES_HYBRID_SELECTIVE` due to ["BLOCKING or HYBRID jobs 
only"](#limitations-2).
 
 ### Automatically decide parallelisms for operators
 
@@ -190,7 +190,7 @@ In addition, there are several related configuration 
options that may need adjus
 ### Limitations
 
 - **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. 
Exception will be thrown if a streaming job is submitted.
-- **ALL_EXCHANGES_BLOCKING jobs only**: At the moment, Adaptive Batch 
Scheduler only supports jobs whose [shuffle mode]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) is 
`ALL_EXCHANGES_BLOCKING`.
+- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler 
only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / 
ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`.
 - **FileInputFormat sources are not supported**: FileInputFormat sources are 
not supported, including `StreamExecutionEnvironment#readFile(...)` 
`StreamExecutionEnvironment#readTextFile(...)` and 
`StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should 
use the new sources([FileSystem DataStream Connector]({{< ref 
"docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL 
Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files 
when using th [...]
 - **Inconsistent broadcast results metrics on WebUI**: When use Adaptive Batch 
Scheduler to automatically decide parallelisms for operators, for broadcast 
results, the number of bytes/records sent by the upstream task counted by 
metric is not equal to the number of bytes/records received by the downstream 
task, which may confuse users when displayed on the Web UI. See 
[FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)
 for details.
 
diff --git a/docs/content/docs/ops/batch/batch_shuffle.md 
b/docs/content/docs/ops/batch/batch_shuffle.md
index 1760ecaf6a7..622bc4a7afa 100644
--- a/docs/content/docs/ops/batch/batch_shuffle.md
+++ b/docs/content/docs/ops/batch/batch_shuffle.md
@@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies:
 
 To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
+#### Data Consumption Constraints
+
+Hybrid shuffle divides the partition data consumption constraints between 
producer and consumer into the following three cases:
+
+- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when 
all producers are finished.
+- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from 
finished producers.
+- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished 
producers.
+
+These could be configured via 
[jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref 
"docs/deployment/config" 
>}}#jobmanager-partition-hybrid-partition-data-consume-constraint).
+
+- **For `AdaptiveBatchScheduler`** : The default constraint is 
`UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set 
to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be 
degraded.
+- **If `SpeculativeExecution` is enabled** : The default constraint is 
`ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with 
blocking shuffle. Since producers and consumers have the opportunity to run at 
the same time, more speculative execution tasks may be created, and the cost of 
failover will also increase. If you want to fall back to the same behavior as 
blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It 
is also important to note  [...]
+
 ### Limitations
 
 Hybrid shuffle mode is still experimental and has some known limitations, 
which the Flink community is still working on eliminating.
 
 - **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently 
forces each task to be executed in a dedicated slot exclusively. If slot 
sharing is explicitly specified, an error will occur.
-- **No support for Adaptive Batch Scheduler and Speculative Execution.** If 
adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
+- **No pipelined execution for dynamic graph.** If auto-parallelism (dynamic 
graph) is enabled, Adaptive Batch Scheduler will wait until upstream tasks 
finish to decide parallelism of downstream tasks, which means hybrid shuffle 
effectively fallback to blocking shuffle (`ALL_PRODUCERS_FINISHED` constraint).
 
 ## Performance Tuning
 
@@ -140,6 +153,7 @@ The following guidelines may help you to achieve better 
performance especially f
 1. Increase the total size of network memory. Currently, the default network 
memory size is pretty modest. For large scale jobs, it's suggested to increase 
the total [network memory fraction]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better 
performance. At the same time, you may also need to adjust the [lower 
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and 
[upper bound]({{< ref "docs/deployment/con [...]
 2. Increase the memory size for shuffle data write. For large scale jobs, it's 
suggested to increase the total size of network memory, the larger the memory 
that can be used in the shuffle write phase, the more opportunities downstream 
to read data directly from memory. You need to ensure that each `Result 
Partition` can be allocated to at least `numSubpartition + 1` buffers, 
otherwise the "Insufficient number of network buffers" will be encountered.
 3. Increase the memory size for shuffle data read. For large scale jobs, it's 
suggested to increase the size of the [shared read memory]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value 
(for example, 256M or 512M). Because this memory is cut from the framework 
off-heap memory, you must increase 
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-size) by the [...]
+4. When `Hybrid Shuffle` is used, decreasing the number of [exclusive buffers 
per channel]({{< ref "docs/deployment/config" 
>}}#taskmanager-network-memory-buffers-per-channel) will seriously affect the 
performance. Therefore, this value should not be set to `0`, and for 
large-scale job, this can be appropriately increased. It should be also noted 
that, for hybrid shuffle, 
[taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref 
"docs/deployment/config" >}}#taskmanager-netwo [...]
 {{< /tab >}}
 
 {{< /tabs >}}

Reply via email to