This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 55eb624e5cd Fix interactive environment clean up failure at atexit.
(#38526)
55eb624e5cd is described below
commit 55eb624e5cd00e546ab19fc411281a0e5f596142
Author: Shunping Huang <[email protected]>
AuthorDate: Mon May 18 15:58:17 2026 -0400
Fix interactive environment clean up failure at atexit. (#38526)
* Fix interactive environment clean up failure at atexit.
* Fix failed tests.
* Formatting.
---
.../runners/interactive/cache_manager.py | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 0dc79d4001a..0b756a57369 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -19,6 +19,7 @@
import base64
import collections
+import logging
import os
import tempfile
from urllib.parse import quote
@@ -29,9 +30,12 @@ from apache_beam import coders
from apache_beam.io import filesystems
from apache_beam.io import textio
from apache_beam.io import tfrecordio
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing import test_stream
from apache_beam.transforms import combiners
+_LOGGER = logging.getLogger(__name__)
+
class CacheManager(object):
"""Abstract class for caching PCollections.
@@ -286,13 +290,17 @@ class FileBasedCacheManager(CacheManager):
return self._reader_class(self._glob_path(*labels))
def cleanup(self):
- if self._cache_dir.startswith('gs://'):
- from apache_beam.io.gcp import gcsfilesystem
- from apache_beam.options.pipeline_options import PipelineOptions
- fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
- fs.delete([self._cache_dir + '/full/'])
- elif filesystems.FileSystems.exists(self._cache_dir):
- filesystems.FileSystems.delete([self._cache_dir])
+ try:
+ if self._cache_dir.startswith('gs://'):
+ # Import GCP dependencies only when needed.
+ from apache_beam.io.gcp import gcsfilesystem
+ fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
+ fs.delete([self._cache_dir + '/full/'])
+ elif filesystems.FileSystems.exists(self._cache_dir):
+ filesystems.FileSystems.delete([self._cache_dir])
+ except Exception as e:
+ _LOGGER.warning(
+ 'Failed to clean up cache directory %s: %s', self._cache_dir, e)
self._saved_pcoders = {}
def _glob_path(self, *labels):