This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03dc734  [FLINK-14481] Unify port range check in couple of missing 
places
03dc734 is described below

commit 03dc734a393ed5a043590f054ed331ae0434fdb0
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Fri Nov 22 10:53:39 2019 +0100

    [FLINK-14481] Unify port range check in couple of missing places
---
 .../java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java    | 3 ++-
 .../apache/flink/streaming/api/functions/sink/SocketClientSink.java    | 3 ++-
 .../flink/streaming/api/functions/source/SocketTextStreamFunction.java | 3 ++-
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index b2dd743..0c4b877 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -41,6 +41,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.flink.util.NetUtils.isValidClientPort;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -192,7 +193,7 @@ public class AkkaRpcServiceUtils {
 
                checkNotNull(hostname, "hostname is null");
                checkNotNull(endpointName, "endpointName is null");
-               checkArgument(port > 0 && port <= 65535, "port must be in [1, 
65535]");
+               checkArgument(isValidClientPort(port), "port must be in [1, 
65535]");
 
                final String protocolPrefix = akkaProtocol == 
AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 80e0dbe..3bead5c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 
+import static org.apache.flink.util.NetUtils.isValidClientPort;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -104,7 +105,7 @@ public class SocketClientSink<IN> extends 
RichSinkFunction<IN> {
         */
        public SocketClientSink(String hostName, int port, 
SerializationSchema<IN> schema,
                                                        int maxNumRetries, 
boolean autoflush) {
-               checkArgument(port > 0 && port < 65536, "port is out of range");
+               checkArgument(isValidClientPort(port), "port is out of range");
                checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero 
or larger (num retries), or -1 (infinite retries)");
 
                this.hostName = checkNotNull(hostName, "hostname must not be 
null");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index 8d04257..af49cc1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -28,6 +28,7 @@ import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
+import static org.apache.flink.util.NetUtils.isValidClientPort;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -71,7 +72,7 @@ public class SocketTextStreamFunction implements 
SourceFunction<String> {
        }
 
        public SocketTextStreamFunction(String hostname, int port, String 
delimiter, long maxNumRetries, long delayBetweenRetries) {
-               checkArgument(port > 0 && port < 65536, "port is out of range");
+               checkArgument(isValidClientPort(port), "port is out of range");
                checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero 
or larger (num retries), or -1 (infinite retries)");
                checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries 
must be zero or positive");
 

Reply via email to