This is an automated email from the ASF dual-hosted git repository.
cvandermerwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c9ab4b6ae8d Add default_pickle_library_override delegation to
InteractiveRunner (#37752)
c9ab4b6ae8d is described below
commit c9ab4b6ae8d38649fc66798a702938bd035fc8d8
Author: Praneet Nadella <[email protected]>
AuthorDate: Tue Mar 3 18:05:55 2026 -0800
Add default_pickle_library_override delegation to InteractiveRunner (#37752)
* Add default_pickle_library_override delegation to InteractiveRunner
* pylint
---
.../runners/interactive/interactive_runner.py | 6 ++++++
.../runners/interactive/interactive_runner_test.py | 20 ++++++++++++++++++++
2 files changed, 26 insertions(+)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 241dcf388dd..737a06c49b3 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -93,6 +93,12 @@ class InteractiveRunner(runners.PipelineRunner):
# return self._underlying_runner.is_fnapi_compatible()
return False
+ def default_pickle_library_override(self):
+ """Delegates pickler override to the underlying runner."""
+ if hasattr(self._underlying_runner, 'default_pickle_library_override'):
+ return self._underlying_runner.default_pickle_library_override()
+ return super().default_pickle_library_override()
+
def set_render_option(self, render_option):
"""Sets the rendering option.
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index ed27d9e55e0..dc30155d599 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -43,6 +43,7 @@ from
apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import Da
from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
from apache_beam.runners.interactive.testing.mock_env import isolated_env
from apache_beam.runners.portability.flink_runner import FlinkRunner
+from apache_beam.runners.runner import PipelineRunner
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import IntervalWindow
@@ -532,6 +533,25 @@ class InteractiveRunnerTest(unittest.TestCase):
# Despite (highly redundant) windowing information, the cache is small.
self.assertLess(size, sum(inputs))
+ def test_default_pickle_library_override_delegates(self):
+ mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner)
+ mock_underlying.default_pickle_library_override.return_value =
'cloudpickle'
+
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=mock_underlying)
+
+ self.assertEqual(runner.default_pickle_library_override(), 'cloudpickle')
+
+ def test_default_pickle_library_override_fallback(self):
+ mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner)
+ del mock_underlying.default_pickle_library_override
+
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=mock_underlying)
+
+ # Should fallback to the base class implementation without crashing
+ self.assertIsNone(runner.default_pickle_library_override())
+
@unittest.skipIf(
not ie.current_env().is_interactive_ready,