This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 55eb624e5cd Fix interactive environment clean up failure at atexit. 
(#38526)
55eb624e5cd is described below

commit 55eb624e5cd00e546ab19fc411281a0e5f596142
Author: Shunping Huang <[email protected]>
AuthorDate: Mon May 18 15:58:17 2026 -0400

    Fix interactive environment clean up failure at atexit. (#38526)
    
    * Fix interactive environment clean up failure at atexit.
    
    * Fix failed tests.
    
    * Formatting.
---
 .../runners/interactive/cache_manager.py           | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 0dc79d4001a..0b756a57369 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -19,6 +19,7 @@
 
 import base64
 import collections
+import logging
 import os
 import tempfile
 from urllib.parse import quote
@@ -29,9 +30,12 @@ from apache_beam import coders
 from apache_beam.io import filesystems
 from apache_beam.io import textio
 from apache_beam.io import tfrecordio
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing import test_stream
 from apache_beam.transforms import combiners
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class CacheManager(object):
   """Abstract class for caching PCollections.
@@ -286,13 +290,17 @@ class FileBasedCacheManager(CacheManager):
     return self._reader_class(self._glob_path(*labels))
 
   def cleanup(self):
-    if self._cache_dir.startswith('gs://'):
-      from apache_beam.io.gcp import gcsfilesystem
-      from apache_beam.options.pipeline_options import PipelineOptions
-      fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
-      fs.delete([self._cache_dir + '/full/'])
-    elif filesystems.FileSystems.exists(self._cache_dir):
-      filesystems.FileSystems.delete([self._cache_dir])
+    try:
+      if self._cache_dir.startswith('gs://'):
+        # Import GCP dependencies only when needed.
+        from apache_beam.io.gcp import gcsfilesystem
+        fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
+        fs.delete([self._cache_dir + '/full/'])
+      elif filesystems.FileSystems.exists(self._cache_dir):
+        filesystems.FileSystems.delete([self._cache_dir])
+    except Exception as e:
+      _LOGGER.warning(
+          'Failed to clean up cache directory %s: %s', self._cache_dir, e)
     self._saved_pcoders = {}
 
   def _glob_path(self, *labels):

Reply via email to