This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 75c88fa4f19 [FLINK-34616][python] Fix python dist dir doesn't clean when open method construct resource has exception. 75c88fa4f19 is described below commit 75c88fa4f19d3f703e0cce3b917a9aa070eadffe Author: Jacky Lau <liuyon...@gmail.com> AuthorDate: Fri Mar 8 09:37:52 2024 +0800 [FLINK-34616][python] Fix python dist dir doesn't clean when open method construct resource has exception. This closes #24462. --- .../env/AbstractPythonEnvironmentManager.java | 63 +++++++++++++--------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java index cfe39175d8b..2f0acfb53dc 100644 --- a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java +++ b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java @@ -123,9 +123,16 @@ public abstract class AbstractPythonEnvironmentManager implements PythonEnvironm "Could not create the base directory: " + baseDirectory); } - Map<String, String> env = constructEnvironmentVariables(baseDirectory); - installRequirements(baseDirectory, env); - return Tuple2.of(baseDirectory, env); + try { + Map<String, String> env = + constructEnvironmentVariables(baseDirectory); + installRequirements(baseDirectory, env); + return Tuple2.of(baseDirectory, env); + } catch (Throwable e) { + deleteBaseDirectory(baseDirectory); + LOG.warn("Failed to create resource.", e); + throw e; + } }); shutdownHook = ShutdownHookUtil.addShutdownHook( @@ -213,6 +220,32 @@ public abstract class AbstractPythonEnvironmentManager implements PythonEnvironm + "' for storing the generated files of python dependency."); } + private static void deleteBaseDirectory(String baseDirectory) { + int retries = 0; + while (true) { + try { + FileUtils.deleteDirectory(new File(baseDirectory)); + break; + } catch (Throwable t) { + retries++; + if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) { + LOG.warn( + String.format( + "Failed to delete the working directory %s of the Python UDF worker. Retrying...", + baseDirectory), + t); + } else { + LOG.warn( + String.format( + "Failed to delete the working directory %s of the Python UDF worker.", + baseDirectory), + t); + break; + } + } + } + } + private void installRequirements(String baseDirectory, Map<String, String> env) throws IOException { // Directory for storing the installation result of the requirements file. @@ -475,29 +508,7 @@ public abstract class AbstractPythonEnvironmentManager implements PythonEnvironm @Override public void close() throws Exception { - int retries = 0; - while (true) { - try { - FileUtils.deleteDirectory(new File(baseDirectory)); - break; - } catch (Throwable t) { - retries++; - if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) { - LOG.warn( - String.format( - "Failed to delete the working directory %s of the Python UDF worker. Retrying...", - baseDirectory), - t); - } else { - LOG.warn( - String.format( - "Failed to delete the working directory %s of the Python UDF worker.", - baseDirectory), - t); - break; - } - } - } + deleteBaseDirectory(baseDirectory); } } }