This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 03dc734 [FLINK-14481] Unify port range check in couple of missing places 03dc734 is described below commit 03dc734a393ed5a043590f054ed331ae0434fdb0 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Fri Nov 22 10:53:39 2019 +0100 [FLINK-14481] Unify port range check in couple of missing places --- .../java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java | 3 ++- .../apache/flink/streaming/api/functions/sink/SocketClientSink.java | 3 ++- .../flink/streaming/api/functions/source/SocketTextStreamFunction.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index b2dd743..0c4b877 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -41,6 +41,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.flink.util.NetUtils.isValidClientPort; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -192,7 +193,7 @@ public class AkkaRpcServiceUtils { checkNotNull(hostname, "hostname is null"); checkNotNull(endpointName, "endpointName is null"); - checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]"); + checkArgument(isValidClientPort(port), "port must be in [1, 65535]"); final String protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index 80e0dbe..3bead5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; +import static org.apache.flink.util.NetUtils.isValidClientPort; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,7 +105,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> { */ public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries, boolean autoflush) { - checkArgument(port > 0 && port < 65536, "port is out of range"); + checkArgument(isValidClientPort(port), "port is out of range"); checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); this.hostName = checkNotNull(hostName, "hostname must not be null"); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 8d04257..af49cc1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -28,6 +28,7 @@ import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.Socket; +import static org.apache.flink.util.NetUtils.isValidClientPort; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -71,7 +72,7 @@ public class SocketTextStreamFunction implements SourceFunction<String> { } public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries, long delayBetweenRetries) { - checkArgument(port > 0 && port < 65536, "port is out of range"); + checkArgument(isValidClientPort(port), "port is out of range"); checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");