This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 873a56361bf [FLINK-32191][netty] support set keepalive options to 
NettyClient.
873a56361bf is described below

commit 873a56361bfd77c828ee743febc9dda2bb044791
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Tue Jun 20 20:53:30 2023 +0800

    [FLINK-32191][netty] support set keepalive options to NettyClient.
---
 .../generated/all_taskmanager_network_section.html |  18 +++
 .../netty_shuffle_environment_configuration.html   |  18 +++
 .../NettyShuffleEnvironmentOptions.java            |  30 +++++
 .../runtime/io/network/netty/NettyClient.java      |  40 ++++++
 .../runtime/io/network/netty/NettyConfig.java      |  13 ++
 .../runtime/io/network/netty/NettyClientTest.java  | 135 +++++++++++++++++++++
 6 files changed, 254 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 7d80b82ada8..b82a5bee720 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -122,6 +122,24 @@
             <td>Integer</td>
             <td>The number of Netty client threads.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.netty.client.tcp.keepCount</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The maximum number of keepalive probes TCP should send before 
Netty client dropping the connection. Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.netty.client.tcp.keepIdleSec</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The time (in seconds) the connection needs to remain idle 
before TCP starts sending keepalive probes. Note: This will not take effect 
when using netty transport type of nio with an older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
+        <tr>
+            
<td><h5>taskmanager.network.netty.client.tcp.keepIntervalSec</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The time (in seconds) between individual keepalive probes. 
Note: This will not take effect when using netty transport type of nio with an 
older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.network.netty.num-arenas</h5></td>
             <td style="word-wrap: break-word;">-1</td>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index cbea22beb26..9730839c0c4 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -110,6 +110,24 @@
             <td>Integer</td>
             <td>The number of Netty client threads.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.netty.client.tcp.keepCount</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The maximum number of keepalive probes TCP should send before 
Netty client dropping the connection. Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.netty.client.tcp.keepIdleSec</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The time (in seconds) the connection needs to remain idle 
before TCP starts sending keepalive probes. Note: This will not take effect 
when using netty transport type of nio with an older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
+        <tr>
+            
<td><h5>taskmanager.network.netty.client.tcp.keepIntervalSec</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The time (in seconds) between individual keepalive probes. 
Note: This will not take effect when using netty transport type of nio with an 
older version of JDK 8, refer to 
https://bugs.openjdk.org/browse/JDK-8194298.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.network.netty.num-arenas</h5></td>
             <td style="word-wrap: break-word;">-1</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index cab12fe1972..1af9deadf35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -491,6 +491,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + " based on the platform. Note that the 
\"epoll\" mode can get better performance, less GC and have more advanced 
features which are"
                                     + " only available on modern Linux.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_IDLE_SECONDS =
+            key("taskmanager.network.netty.client.tcp.keepIdleSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The time (in seconds) the connection needs to 
remain idle before TCP starts sending keepalive probes. "
+                                    + "Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_INTERVAL_SECONDS 
=
+            key("taskmanager.network.netty.client.tcp.keepIntervalSec")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The time (in seconds) between individual 
keepalive probes. "
+                                    + "Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> CLIENT_TCP_KEEP_COUNT =
+            key("taskmanager.network.netty.client.tcp.keepCount")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The maximum number of keepalive probes TCP should 
send before Netty client dropping the connection. "
+                                    + "Note: This will not take effect when 
using netty transport type of nio with an older version of JDK 8, "
+                                    + "refer to 
https://bugs.openjdk.org/browse/JDK-8194298.";);
+
     // ------------------------------------------------------------------------
     //  Partition Request Options
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index a512fe868e8..755a8ee99e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,26 +18,33 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioChannelOption;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
+import jdk.net.ExtendedSocketOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.net.SocketOption;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -45,6 +52,10 @@ class NettyClient {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyClient.class);
 
+    @VisibleForTesting static final String NIO_TCP_KEEPIDLE_KEY = 
"TCP_KEEPIDLE";
+    @VisibleForTesting static final String NIO_TCP_KEEPINTERVAL_KEY = 
"TCP_KEEPINTERVAL";
+    @VisibleForTesting static final String NIO_TCP_KEEPCOUNT_KEY = 
"TCP_KEEPCOUNT";
+
     private final NettyConfig config;
 
     private NettyProtocol protocol;
@@ -152,6 +163,27 @@ class NettyClient {
                 new NioEventLoopGroup(
                         config.getClientNumThreads(), 
NettyServer.getNamedThreadFactory(name));
         bootstrap.group(nioGroup).channel(NioSocketChannel.class);
+
+        config.getTcpKeepIdleInSeconds()
+                .ifPresent(idle -> 
setNioKeepaliveOptions(NIO_TCP_KEEPIDLE_KEY, idle));
+        config.getTcpKeepInternalInSeconds()
+                .ifPresent(interval -> 
setNioKeepaliveOptions(NIO_TCP_KEEPINTERVAL_KEY, interval));
+        config.getTcpKeepCount()
+                .ifPresent(count -> 
setNioKeepaliveOptions(NIO_TCP_KEEPCOUNT_KEY, count));
+    }
+
+    @SuppressWarnings("unchecked")
+    private void setNioKeepaliveOptions(String option, int value) {
+        try {
+            Field field = ExtendedSocketOptions.class.getField(option);
+            bootstrap.option(NioChannelOption.of((SocketOption<Integer>) 
field.get(null)), value);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            LOG.error(
+                    "Ignore keepalive option {}, this may be due to using 
netty transport type of nio and an older version of jdk 8,"
+                            + " refer to 
https://bugs.openjdk.org/browse/JDK-8194298";,
+                    option,
+                    e);
+        }
     }
 
     private void initEpollBootstrap() {
@@ -163,6 +195,14 @@ class NettyClient {
                 new EpollEventLoopGroup(
                         config.getClientNumThreads(), 
NettyServer.getNamedThreadFactory(name));
         bootstrap.group(epollGroup).channel(EpollSocketChannel.class);
+
+        config.getTcpKeepIdleInSeconds()
+                .ifPresent(idle -> 
bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, idle));
+        config.getTcpKeepInternalInSeconds()
+                .ifPresent(
+                        interval -> 
bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, interval));
+        config.getTcpKeepCount()
+                .ifPresent(count -> 
bootstrap.option(EpollChannelOption.TCP_KEEPCNT, count));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 979295b8d65..226b834d389 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.net.InetAddress;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -142,6 +143,18 @@ public class NettyConfig {
         }
     }
 
+    public Optional<Integer> getTcpKeepIdleInSeconds() {
+        return 
config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS);
+    }
+
+    public Optional<Integer> getTcpKeepInternalInSeconds() {
+        return 
config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS);
+    }
+
+    public Optional<Integer> getTcpKeepCount() {
+        return 
config.getOptional(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT);
+    }
+
     @Nullable
     public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
         return getSSLEnabled() ? 
SSLUtils.createInternalClientSSLEngineFactory(config) : null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
new file mode 100644
index 00000000000..6e29197ede7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
+
+import jdk.net.ExtendedSocketOptions;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/** Tests for {@link NettyClient}. */
+public class NettyClientTest {
+    @Test
+    void testSetKeepaliveOptionWithNioConfigurable() throws Exception {
+        assumeThat(keepaliveForNioConfigurable()).isTrue();
+
+        final Configuration config = new Configuration();
+        config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "nio");
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
+        config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);
+
+        try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+            final NettyClient client = createNettyClient(config, clientPort);
+            Map<String, Object> options =
+                    
client.getBootstrap().config().options().entrySet().stream()
+                            .collect(Collectors.toMap(e -> e.getKey().name(), 
Map.Entry::getValue));
+            assertThat(options)
+                    .containsEntry(NettyClient.NIO_TCP_KEEPIDLE_KEY, 300)
+                    .containsEntry(NettyClient.NIO_TCP_KEEPINTERVAL_KEY, 10)
+                    .containsEntry(NettyClient.NIO_TCP_KEEPCOUNT_KEY, 8);
+        }
+    }
+
+    /**
+     * Test that keepalive options will not take effect when using netty 
transport type of nio with
+     * an older version of JDK 8.
+     */
+    @Test
+    void testSetKeepaliveOptionWithNioNotConfigurable() throws Exception {
+        assumeThat(keepaliveForNioConfigurable()).isFalse();
+
+        final Configuration config = new Configuration();
+        config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "nio");
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
+        config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);
+
+        try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+            final NettyClient client = createNettyClient(config, clientPort);
+            Map<String, Object> options =
+                    
client.getBootstrap().config().options().entrySet().stream()
+                            .collect(Collectors.toMap(e -> e.getKey().name(), 
Map.Entry::getValue));
+            assertThat(options)
+                    .doesNotContainKeys(
+                            NettyClient.NIO_TCP_KEEPIDLE_KEY,
+                            NettyClient.NIO_TCP_KEEPINTERVAL_KEY,
+                            NettyClient.NIO_TCP_KEEPCOUNT_KEY);
+        }
+    }
+
+    @Test
+    void testSetKeepaliveOptionWithEpoll() throws Exception {
+        assumeThat(Epoll.isAvailable()).isTrue();
+
+        final Configuration config = new Configuration();
+        config.set(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE, "epoll");
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_IDLE_SECONDS, 300);
+        
config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_INTERVAL_SECONDS, 10);
+        config.set(NettyShuffleEnvironmentOptions.CLIENT_TCP_KEEP_COUNT, 8);
+
+        try (NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+            final NettyClient client = createNettyClient(config, clientPort);
+            Map<ChannelOption<?>, Object> options = 
client.getBootstrap().config().options();
+            assertThat(options)
+                    .containsEntry(EpollChannelOption.TCP_KEEPIDLE, 300)
+                    .containsEntry(EpollChannelOption.TCP_KEEPINTVL, 10)
+                    .containsEntry(EpollChannelOption.TCP_KEEPCNT, 8);
+        }
+    }
+
+    private static boolean keepaliveForNioConfigurable() {
+        try {
+            
ExtendedSocketOptions.class.getField(NettyClient.NIO_TCP_KEEPIDLE_KEY);
+        } catch (NoSuchFieldException e) {
+            return false;
+        }
+        return true;
+    }
+
+    private static NettyClient createNettyClient(Configuration config, 
NetUtils.Port port)
+            throws Exception {
+
+        final NettyConfig nettyClientConfig =
+                new NettyConfig(
+                        InetAddress.getLoopbackAddress(),
+                        port.getPort(),
+                        NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+                        1,
+                        config);
+
+        final NettyBufferPool bufferPool = new NettyBufferPool(1);
+        final NettyProtocol protocol = new NettyProtocol(null, null);
+
+        return NettyTestUtil.initClient(nettyClientConfig, protocol, 
bufferPool);
+    }
+}

Reply via email to