This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5f744d3 [FLINK-17454][python] Specify a port number for gateway callback server from python gateway. 5f744d3 is described below commit 5f744d3f81bcfb8f77164a5ec9caa4594851d4bf Author: acqua.csq <acqua....@alibaba-inc.com> AuthorDate: Fri May 8 23:29:12 2020 +0800 [FLINK-17454][python] Specify a port number for gateway callback server from python gateway. This closes #12061 --- flink-python/pyflink/java_gateway.py | 11 +++-- .../apache/flink/client/python/PythonEnvUtils.java | 55 +++++++++++++++++----- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index d8e061b..33ab2ae 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -49,15 +49,19 @@ def get_gateway(): # if Java Gateway is already running if 'PYFLINK_GATEWAY_PORT' in os.environ: gateway_port = int(os.environ['PYFLINK_GATEWAY_PORT']) - callback_port = int(os.environ['PYFLINK_CALLBACK_PORT']) gateway_param = GatewayParameters(port=gateway_port, auto_convert=True) _gateway = JavaGateway( gateway_parameters=gateway_param, callback_server_parameters=CallbackServerParameters( - port=callback_port, daemonize=True, daemonize_connections=True)) + port=0, daemonize=True, daemonize_connections=True)) else: _gateway = launch_gateway() + callback_server = _gateway.get_callback_server() + callback_server_listening_address = callback_server.get_listening_address() + callback_server_listening_port = callback_server.get_listening_port() + _gateway.jvm.org.apache.flink.client.python.PythonEnvUtils.resetCallbackClient( + callback_server_listening_address, callback_server_listening_port) # import the flink view import_flink_view(_gateway) install_exception_handler() @@ -102,7 +106,6 @@ def launch_gateway(): with open(conn_info_file, "rb") as info: gateway_port = struct.unpack("!I", info.read(4))[0] - callback_port = struct.unpack("!I", info.read(4))[0] finally: shutil.rmtree(conn_info_dir) @@ -110,7 +113,7 @@ def launch_gateway(): gateway = JavaGateway( gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True), callback_server_parameters=CallbackServerParameters( - port=callback_port, daemonize=True, daemonize_connections=True)) + port=0, daemonize=True, daemonize_connections=True)) return gateway diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java index cf15b7b..76370dc 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java @@ -35,7 +35,10 @@ import py4j.GatewayServer; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.file.FileSystems; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -277,16 +280,7 @@ final class PythonEnvUtils { .gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort))) .javaPort(0) .build(); - CallbackClient callbackClient = (CallbackClient) server.getCallbackClient(); - // The Java API of py4j does not provide approach to set "daemonize_connections" parameter. - // Use reflect to daemonize the connection thread. - Field executor = CallbackClient.class.getDeclaredField("executor"); - executor.setAccessible(true); - ((ScheduledExecutorService) executor.get(callbackClient)).shutdown(); - executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new)); - Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner"); - setupCleaner.setAccessible(true); - setupCleaner.invoke(callbackClient); + resetCallbackClientExecutorService(server); gatewayServerFuture.complete(server); server.start(true); } catch (Throwable e) { @@ -301,6 +295,43 @@ final class PythonEnvUtils { } /** + * Reset a daemon thread to the callback client thread pool so that the callback server can be terminated when gate + * way server is shutting down. We need to shut down the none-daemon thread firstly, then set a new thread created + * in a daemon thread to the ExecutorService. + * + * @param gatewayServer the gateway which creates the callback server. + * */ + private static void resetCallbackClientExecutorService(GatewayServer gatewayServer) throws NoSuchFieldException, + IllegalAccessException, NoSuchMethodException, InvocationTargetException { + CallbackClient callbackClient = (CallbackClient) gatewayServer.getCallbackClient(); + // The Java API of py4j does not provide approach to set "daemonize_connections" parameter. + // Use reflect to daemonize the connection thread. + Field executor = CallbackClient.class.getDeclaredField("executor"); + executor.setAccessible(true); + ((ScheduledExecutorService) executor.get(callbackClient)).shutdown(); + executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new)); + Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner"); + setupCleaner.setAccessible(true); + setupCleaner.invoke(callbackClient); + } + + /** + * Reset the callback client of gatewayServer with the given callbackListeningAddress and callbackListeningPort + * after the callback server started. + * + * @param callbackServerListeningAddress the listening address of the callback server. + * @param callbackServerListeningPort the listening port of the callback server. + * */ + public static void resetCallbackClient(String callbackServerListeningAddress, int callbackServerListeningPort) throws + UnknownHostException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, + NoSuchFieldException { + + gatewayServer = getGatewayServer(); + gatewayServer.resetCallbackClient(InetAddress.getByName(callbackServerListeningAddress), callbackServerListeningPort); + resetCallbackClientExecutorService(gatewayServer); + } + + /** * Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is * the entry point of Java to Python RPC. Since the Py4j Python client will only be launched * only once, the GatewayServer object needs to be reused. @@ -313,7 +344,7 @@ final class PythonEnvUtils { static void setGatewayServer(GatewayServer gatewayServer) { Preconditions.checkArgument(gatewayServer == null || PythonEnvUtils.gatewayServer == null); - PythonEnvUtils.gatewayServer = null; + PythonEnvUtils.gatewayServer = gatewayServer; } static Process launchPy4jPythonClient( @@ -326,8 +357,6 @@ final class PythonEnvUtils { config, entryPointScript, tmpDir); // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process. pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); - // set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process. - pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort())); // start the python process. return PythonEnvUtils.startPythonProcess(pythonEnv, commands); }