This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 75c88fa4f19 [FLINK-34616][python] Fix python dist dir doesn't clean 
when open method construct resource has exception.
75c88fa4f19 is described below

commit 75c88fa4f19d3f703e0cce3b917a9aa070eadffe
Author: Jacky Lau <liuyon...@gmail.com>
AuthorDate: Fri Mar 8 09:37:52 2024 +0800

    [FLINK-34616][python] Fix python dist dir doesn't clean when open method 
construct resource has exception.
    
    This closes #24462.
---
 .../env/AbstractPythonEnvironmentManager.java      | 63 +++++++++++++---------
 1 file changed, 37 insertions(+), 26 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index cfe39175d8b..2f0acfb53dc 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -123,9 +123,16 @@ public abstract class AbstractPythonEnvironmentManager 
implements PythonEnvironm
                                         "Could not create the base directory: 
" + baseDirectory);
                             }
 
-                            Map<String, String> env = 
constructEnvironmentVariables(baseDirectory);
-                            installRequirements(baseDirectory, env);
-                            return Tuple2.of(baseDirectory, env);
+                            try {
+                                Map<String, String> env =
+                                        
constructEnvironmentVariables(baseDirectory);
+                                installRequirements(baseDirectory, env);
+                                return Tuple2.of(baseDirectory, env);
+                            } catch (Throwable e) {
+                                deleteBaseDirectory(baseDirectory);
+                                LOG.warn("Failed to create resource.", e);
+                                throw e;
+                            }
                         });
         shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
@@ -213,6 +220,32 @@ public abstract class AbstractPythonEnvironmentManager 
implements PythonEnvironm
                         + "' for storing the generated files of python 
dependency.");
     }
 
+    private static void deleteBaseDirectory(String baseDirectory) {
+        int retries = 0;
+        while (true) {
+            try {
+                FileUtils.deleteDirectory(new File(baseDirectory));
+                break;
+            } catch (Throwable t) {
+                retries++;
+                if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+                    LOG.warn(
+                            String.format(
+                                    "Failed to delete the working directory %s 
of the Python UDF worker. Retrying...",
+                                    baseDirectory),
+                            t);
+                } else {
+                    LOG.warn(
+                            String.format(
+                                    "Failed to delete the working directory %s 
of the Python UDF worker.",
+                                    baseDirectory),
+                            t);
+                    break;
+                }
+            }
+        }
+    }
+
     private void installRequirements(String baseDirectory, Map<String, String> 
env)
             throws IOException {
         // Directory for storing the installation result of the requirements 
file.
@@ -475,29 +508,7 @@ public abstract class AbstractPythonEnvironmentManager 
implements PythonEnvironm
 
         @Override
         public void close() throws Exception {
-            int retries = 0;
-            while (true) {
-                try {
-                    FileUtils.deleteDirectory(new File(baseDirectory));
-                    break;
-                } catch (Throwable t) {
-                    retries++;
-                    if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
-                        LOG.warn(
-                                String.format(
-                                        "Failed to delete the working 
directory %s of the Python UDF worker. Retrying...",
-                                        baseDirectory),
-                                t);
-                    } else {
-                        LOG.warn(
-                                String.format(
-                                        "Failed to delete the working 
directory %s of the Python UDF worker.",
-                                        baseDirectory),
-                                t);
-                        break;
-                    }
-                }
-            }
+            deleteBaseDirectory(baseDirectory);
         }
     }
 }

Reply via email to