hequn8128 commented on a change in pull request #10017: [FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API URL: https://github.com/apache/flink/pull/10017#discussion_r342377276
########## File path: flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.util.UnzipUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.Environments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The ProcessEnvironmentManager used to prepare the working dir of python UDF worker + * and create ProcessEnvironment object of Beam Fn API. It will be created if the python + * function runner is configured to run python UDF in process mode. + */ +@Internal +public class ProcessEnvironmentManager implements PythonEnvironmentManager { + + private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentManager.class); + + static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"; + static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR = "_PYTHON_REQUIREMENTS_TARGET_DIR"; + static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR"; + + static final String PYTHON_TMP_DIR_PREFIX = "python_dist_"; + static final String PYTHON_REQUIREMENTS_TARGET_DIR_NAME = "python_requirements_target"; + static final String PYTHON_ARCHIVES_DIR = "python_archives"; + static final String PYTHON_PATH_FILES_DIR = "python_path_files_dir"; + + private PythonDependencyManager dependencyManager; + private String pythonTmpDirectoryBase; + private String requirementsTargetDirPath; + private String pythonWorkingDirectory; + private String pythonPathFilesDirectory; + private Thread shutdownHook; + private Map<String, String> systemEnv; + + static boolean testCopy = false; + + private ProcessEnvironmentManager( + PythonDependencyManager dependencyManager, + String pythonTmpDirectoryBase, + String pythonPathFilesDirectory, + String pythonWorkingDirectory, + String requirementsTargetDirPath, + Map<String, String> systemEnv) { + this.dependencyManager = dependencyManager; + this.pythonTmpDirectoryBase = pythonTmpDirectoryBase; + this.pythonPathFilesDirectory = pythonPathFilesDirectory; + this.pythonWorkingDirectory = pythonWorkingDirectory; + this.requirementsTargetDirPath = requirementsTargetDirPath; + this.systemEnv = systemEnv; + } + + @Override + public void cleanup() { + if (shutdownHook != null) { + shutdownHook.run(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdownHook = null; + } + } + + @Override + public RunnerApi.Environment createEnvironment() { + prepareEnvironment(); + Map<String, String> generatedEnv = generateEnvironmentVariable(); + String flinkHomePath = systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR); + String pythonWorkerCommand = + flinkHomePath + File.separator + "bin" + File.separator + "pyflink-udf-runner.sh"; + + return Environments.createProcessEnvironment( + "", + "", + pythonWorkerCommand, + generatedEnv); + } + + /** + * Just return a empty RetrievalToken because no files will be transmit via ArtifactService in process mode. + * + * @return The path of empty RetrievalToken. + */ + @Override + public String createRetrievalToken() throws IOException { + if (shutdownHook == null) { Review comment: It's safer to register the hook in the open method so that we don't need to register the hook at every place which operates directories. We can add an open method in `PythonEnvironmentManager` and rename cleanup to close. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services