This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f744d3  [FLINK-17454][python] Specify a port number for gateway 
callback server from python gateway.
5f744d3 is described below

commit 5f744d3f81bcfb8f77164a5ec9caa4594851d4bf
Author: acqua.csq <acqua....@alibaba-inc.com>
AuthorDate: Fri May 8 23:29:12 2020 +0800

    [FLINK-17454][python] Specify a port number for gateway callback server 
from python gateway.
    
    This closes #12061
---
 flink-python/pyflink/java_gateway.py               | 11 +++--
 .../apache/flink/client/python/PythonEnvUtils.java | 55 +++++++++++++++++-----
 2 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index d8e061b..33ab2ae 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -49,15 +49,19 @@ def get_gateway():
             # if Java Gateway is already running
             if 'PYFLINK_GATEWAY_PORT' in os.environ:
                 gateway_port = int(os.environ['PYFLINK_GATEWAY_PORT'])
-                callback_port = int(os.environ['PYFLINK_CALLBACK_PORT'])
                 gateway_param = GatewayParameters(port=gateway_port, 
auto_convert=True)
                 _gateway = JavaGateway(
                     gateway_parameters=gateway_param,
                     callback_server_parameters=CallbackServerParameters(
-                        port=callback_port, daemonize=True, 
daemonize_connections=True))
+                        port=0, daemonize=True, daemonize_connections=True))
             else:
                 _gateway = launch_gateway()
 
+            callback_server = _gateway.get_callback_server()
+            callback_server_listening_address = 
callback_server.get_listening_address()
+            callback_server_listening_port = 
callback_server.get_listening_port()
+            
_gateway.jvm.org.apache.flink.client.python.PythonEnvUtils.resetCallbackClient(
+                callback_server_listening_address, 
callback_server_listening_port)
             # import the flink view
             import_flink_view(_gateway)
             install_exception_handler()
@@ -102,7 +106,6 @@ def launch_gateway():
 
         with open(conn_info_file, "rb") as info:
             gateway_port = struct.unpack("!I", info.read(4))[0]
-            callback_port = struct.unpack("!I", info.read(4))[0]
     finally:
         shutil.rmtree(conn_info_dir)
 
@@ -110,7 +113,7 @@ def launch_gateway():
     gateway = JavaGateway(
         gateway_parameters=GatewayParameters(port=gateway_port, 
auto_convert=True),
         callback_server_parameters=CallbackServerParameters(
-            port=callback_port, daemonize=True, daemonize_connections=True))
+            port=0, daemonize=True, daemonize_connections=True))
 
     return gateway
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index cf15b7b..76370dc 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -35,7 +35,10 @@ import py4j.GatewayServer;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.file.FileSystems;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -277,16 +280,7 @@ final class PythonEnvUtils {
                                        .gateway(new Gateway(new 
ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
                                        .javaPort(0)
                                        .build();
-                               CallbackClient callbackClient = 
(CallbackClient) server.getCallbackClient();
-                               // The Java API of py4j does not provide 
approach to set "daemonize_connections" parameter.
-                               // Use reflect to daemonize the connection 
thread.
-                               Field executor = 
CallbackClient.class.getDeclaredField("executor");
-                               executor.setAccessible(true);
-                               ((ScheduledExecutorService) 
executor.get(callbackClient)).shutdown();
-                               executor.set(callbackClient, 
Executors.newScheduledThreadPool(1, Thread::new));
-                               Method setupCleaner = 
CallbackClient.class.getDeclaredMethod("setupCleaner");
-                               setupCleaner.setAccessible(true);
-                               setupCleaner.invoke(callbackClient);
+                               resetCallbackClientExecutorService(server);
                                gatewayServerFuture.complete(server);
                                server.start(true);
                        } catch (Throwable e) {
@@ -301,6 +295,43 @@ final class PythonEnvUtils {
        }
 
        /**
+        * Reset a daemon thread to the callback client thread pool so that the 
callback server can be terminated when gate
+        * way server is shutting down. We need to shut down the none-daemon 
thread firstly, then set a new thread created
+        * in a daemon thread to the ExecutorService.
+        *
+        * @param gatewayServer the gateway which creates the callback server.
+        * */
+       private static void resetCallbackClientExecutorService(GatewayServer 
gatewayServer) throws NoSuchFieldException,
+               IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
+               CallbackClient callbackClient = (CallbackClient) 
gatewayServer.getCallbackClient();
+               // The Java API of py4j does not provide approach to set 
"daemonize_connections" parameter.
+               // Use reflect to daemonize the connection thread.
+               Field executor = 
CallbackClient.class.getDeclaredField("executor");
+               executor.setAccessible(true);
+               ((ScheduledExecutorService) 
executor.get(callbackClient)).shutdown();
+               executor.set(callbackClient, 
Executors.newScheduledThreadPool(1, Thread::new));
+               Method setupCleaner = 
CallbackClient.class.getDeclaredMethod("setupCleaner");
+               setupCleaner.setAccessible(true);
+               setupCleaner.invoke(callbackClient);
+       }
+
+       /**
+        * Reset the callback client of gatewayServer with the given 
callbackListeningAddress and callbackListeningPort
+        * after the callback server started.
+        *
+        * @param callbackServerListeningAddress the listening address of the 
callback server.
+        * @param callbackServerListeningPort the listening port of the 
callback server.
+        * */
+       public static void resetCallbackClient(String 
callbackServerListeningAddress, int callbackServerListeningPort) throws
+               UnknownHostException, InvocationTargetException, 
NoSuchMethodException, IllegalAccessException,
+               NoSuchFieldException {
+
+               gatewayServer = getGatewayServer();
+               
gatewayServer.resetCallbackClient(InetAddress.getByName(callbackServerListeningAddress),
 callbackServerListeningPort);
+               resetCallbackClientExecutorService(gatewayServer);
+       }
+
+       /**
         * Py4J both supports Java to Python RPC and Python to Java RPC. The 
GatewayServer object is
         * the entry point of Java to Python RPC. Since the Py4j Python client 
will only be launched
         * only once, the GatewayServer object needs to be reused.
@@ -313,7 +344,7 @@ final class PythonEnvUtils {
 
        static void setGatewayServer(GatewayServer gatewayServer) {
                Preconditions.checkArgument(gatewayServer == null || 
PythonEnvUtils.gatewayServer == null);
-               PythonEnvUtils.gatewayServer = null;
+               PythonEnvUtils.gatewayServer = gatewayServer;
        }
 
        static Process launchPy4jPythonClient(
@@ -326,8 +357,6 @@ final class PythonEnvUtils {
                        config, entryPointScript, tmpDir);
                // set env variable PYFLINK_GATEWAY_PORT for connecting of 
python gateway in python process.
                pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", 
String.valueOf(gatewayServer.getListeningPort()));
-               // set env variable PYFLINK_CALLBACK_PORT for creating callback 
server in python process.
-               pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", 
String.valueOf(gatewayServer.getCallbackClient().getPort()));
                // start the python process.
                return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
        }

Reply via email to