[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5317 ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r171575083 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -269,15 +269,21 @@ public static final ConfigOption NETWORK_BUFFERS_PER_CHANNEL = key("taskmanager.network.memory.buffers-per-channel") .defaultValue(2) - .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)."); + .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." + + "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" + + " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" + + " for parallel serialization."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") .defaultValue(8) - .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)."); + .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." + + " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." + + " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" + + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions."); --- End diff -- Yeah, I already added it. ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r166921740 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,14 +78,35 @@ void requestSubpartitionView( } } - InputChannelID getReceiverId() { + @Override + public void addCredit(int creditDeltas) { + } + + @Override + public void setRegisteredAsAvailable(boolean isRegisteredAvailable) { + } + + @Override + public boolean isRegisteredAsAvailable() { + return false; --- End diff -- Why this is not implemented? ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r171525131 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java --- @@ -269,15 +269,21 @@ public static final ConfigOption NETWORK_BUFFERS_PER_CHANNEL = key("taskmanager.network.memory.buffers-per-channel") .defaultValue(2) - .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)."); + .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." + + "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" + + " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" + + " for parallel serialization."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") .defaultValue(8) - .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)."); + .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." + + " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." + + " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" + + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions."); --- End diff -- add: `This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.`? ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155607 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + +- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, it indicates how many floating credits are shared for all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback. So the floating buffers can help relief back-pressure caused by imbalance data distribution among subpartitions. + --- End diff -- thanks for your polish, alpinegizmo. I will apply the above fixes. ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r163155437 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + --- End diff -- It is also used in current old mode and it is no need to change the default value in most cases in the old mode. Considering the new credit-based mode, if the value greater than 2, it can get benefits if the bottleneck is caused by slow downstream processing. The greater the value is set, the lower probability of blocking the upstream and causing back-pressure. But we should also consider the total available buffer resources for setting this parameter. ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r16290 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + --- End diff -- This needs some clarification. Is taskmanager.network.memory.buffers-per-channel only used in credit-based flow control mode? Does choosing a value greater than 2 provide any benefit? ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r162906372 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + +- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, it indicates how many floating credits are shared for all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback. So the floating buffers can help relief back-pressure caused by imbalance data distribution among subpartitions. + --- End diff -- Number of extra network buffers used by each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/5317 [FLINK-8458] Add the switch for keeping both the old mode and the new credit-based mode ## What is the purpose of the change *After the whole feature of credit-based flow control is done, we should add a config parameter to switch on/off the new credit-based mode. To do so, we can roll back to the old network mode for any expected risks.* *The parameter is defined as taskmanager.network.credit-based-flow-control.enabled and the default value is true. This switch may be removed after next release.* *This PR is based on #4552 whose commit is also included for passing travis.* ## Brief change log - *Abstract the `NetworkClientHandler` interface for different implementations in two modes* - *Abstract the `NetworkSequenceViewReader` interface for different implementations in two modes* - *Define the `taskmanager.network.credit-based-flow-control.enabled` in `TaskManagerOptions`* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-8458 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5317.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5317 commit a3b41f51173adaa382b42877142664d2b09101f7 Author: Zhijiang Date: 2017-09-30T06:36:19Z [FLINK-7456][network] Implement Netty sender incoming pipeline for credit-based commit a8154989f9c93e71e3051d7184c0f02316f1a3c7 Author: Zhijiang Date: 2018-01-17T06:15:04Z [FLINK-8458] Add the switch for keeping both the old mode and the new credit-based mode ---