This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5ceda3da20ec5ed50849d89a629744fe9d4010d8 Author: Weijie Guo <res...@163.com> AuthorDate: Wed Feb 8 15:25:40 2023 +0800 [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler. This closes #21890 --- docs/content.zh/docs/deployment/elastic_scaling.md | 4 ++-- docs/content.zh/docs/ops/batch/batch_shuffle.md | 16 +++++++++++++++- docs/content/docs/deployment/elastic_scaling.md | 4 ++-- docs/content/docs/ops/batch/batch_shuffle.md | 16 +++++++++++++++- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md index 1e7409fb989..c65c974a35b 100644 --- a/docs/content.zh/docs/deployment/elastic_scaling.md +++ b/docs/content.zh/docs/deployment/elastic_scaling.md @@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量 - SQL作业中的算子也可以分配不同的并行度 -当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 。 +当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 `ALL_EXCHANGES_HYBRID_SELECTIVE`。 ### 自动推导并发度 @@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 ### 局限性 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。 -- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 的作业。 +- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件. - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。 diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md index 9ec955ce27c..fda29f75a01 100644 --- a/docs/content.zh/docs/ops/batch/batch_shuffle.md +++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md @@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies: To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +#### Data Consumption Constraints + +Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases: + +- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished. +- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from finished producers. +- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished producers. + +These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint). + +- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded. +- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back to the same behavior as blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It is also important to note [...] + ### Limitations Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating. - **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur. -- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur. +- **No pipelined execution for dynamic graph.** If auto-parallelism (dynamic graph) is enabled, Adaptive Batch Scheduler will wait until upstream tasks finish to decide parallelism of downstream tasks, which means hybrid shuffle effectively fallback to blocking shuffle (`ALL_PRODUCERS_FINISHED` constraint). ## 性能调优 @@ -140,6 +153,7 @@ Hybrid shuffle mode is still experimental and has some known limitations, which 1. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将 [网络内存比例]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少 0.2。为了使调整生效,你可能需要同时调整 [网络内存大小下界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) 以及 [网络内存大小上界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个 [内存配置文档]({{< ref "docs/deployment/memory/mem_setup_tm" >}})。 2. 增大数据写出内存。对于大规模作业, 建议增大总内存大小,用于数据写入的内存越大, 下游越有机会直接从内存读取数据. 你需要保证每个 `Result Partition` 至少能够分配到 `numSubpartition + 1` 个buffer, 否则可能会遇到 "Insufficient number of network buffers" 错误。 3. 增大数据读取内存。对于大规模作业,建议增大 [数据读取内存]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此你必须增加相同的内存大小到 [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。 +4. 当使用 `Hybrid Shuffle` 时, 减少 [独占网络缓冲区]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 可能会严重影响性能. 因此,最好不要将改值设置为 `0`, 并且对于大规模作业可以适当增加该值. 同样需要注意的是: hybrid shuffle 默认会将 [taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-read-buffer-required-per-gate-max) 设置为 `Integer.MAX_VALUE`. 最好不要去调整该配置,否则可能会造成性能的下降. {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/deployment/elastic_scaling.md b/docs/content/docs/deployment/elastic_scaling.md index e22ce9a583e..b11e459f294 100644 --- a/docs/content/docs/deployment/elastic_scaling.md +++ b/docs/content/docs/deployment/elastic_scaling.md @@ -158,7 +158,7 @@ The Adaptive Batch Scheduler is a batch job scheduler that can automatically adj - Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned At present, the Adaptive Batch Scheduler is the default scheduler for Flink batch jobs. No additional configuration is required unless other schedulers are explicitly configured, e.g. `jobmanager.scheduler: default`. Note that you need to -leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL_EXCHANGES_BLOCKING` (default value) due to ["ALL_EXCHANGES_BLOCKING jobs only"](#limitations-2). +leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL_EXCHANGES_BLOCKING` (default value) or `ALL_EXCHANGES_HYBRID_FULL` or `ALL_EXCHANGES_HYBRID_SELECTIVE` due to ["BLOCKING or HYBRID jobs only"](#limitations-2). ### Automatically decide parallelisms for operators @@ -190,7 +190,7 @@ In addition, there are several related configuration options that may need adjus ### Limitations - **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted. -- **ALL_EXCHANGES_BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING`. +- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`. - **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using th [...] - **Inconsistent broadcast results metrics on WebUI**: When use Adaptive Batch Scheduler to automatically decide parallelisms for operators, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details. diff --git a/docs/content/docs/ops/batch/batch_shuffle.md b/docs/content/docs/ops/batch/batch_shuffle.md index 1760ecaf6a7..622bc4a7afa 100644 --- a/docs/content/docs/ops/batch/batch_shuffle.md +++ b/docs/content/docs/ops/batch/batch_shuffle.md @@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies: To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +#### Data Consumption Constraints + +Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases: + +- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished. +- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from finished producers. +- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished producers. + +These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint). + +- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded. +- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back to the same behavior as blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It is also important to note [...] + ### Limitations Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating. - **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur. -- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur. +- **No pipelined execution for dynamic graph.** If auto-parallelism (dynamic graph) is enabled, Adaptive Batch Scheduler will wait until upstream tasks finish to decide parallelism of downstream tasks, which means hybrid shuffle effectively fallback to blocking shuffle (`ALL_PRODUCERS_FINISHED` constraint). ## Performance Tuning @@ -140,6 +153,7 @@ The following guidelines may help you to achieve better performance especially f 1. Increase the total size of network memory. Currently, the default network memory size is pretty modest. For large scale jobs, it's suggested to increase the total [network memory fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better performance. At the same time, you may also need to adjust the [lower bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and [upper bound]({{< ref "docs/deployment/con [...] 2. Increase the memory size for shuffle data write. For large scale jobs, it's suggested to increase the total size of network memory, the larger the memory that can be used in the shuffle write phase, the more opportunities downstream to read data directly from memory. You need to ensure that each `Result Partition` can be allocated to at least `numSubpartition + 1` buffers, otherwise the "Insufficient number of network buffers" will be encountered. 3. Increase the memory size for shuffle data read. For large scale jobs, it's suggested to increase the size of the [shared read memory]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value (for example, 256M or 512M). Because this memory is cut from the framework off-heap memory, you must increase [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) by the [...] +4. When `Hybrid Shuffle` is used, decreasing the number of [exclusive buffers per channel]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) will seriously affect the performance. Therefore, this value should not be set to `0`, and for large-scale job, this can be appropriately increased. It should be also noted that, for hybrid shuffle, [taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref "docs/deployment/config" >}}#taskmanager-netwo [...] {{< /tab >}} {{< /tabs >}}