This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b5962e747 Refactor cloudpickle support in Python operators/decorators 
(#39270)
7b5962e747 is described below

commit 7b5962e747576ccae7798e1d9c6c1398086efd29
Author: Andrey Anshin <andrey.ans...@taragol.is>
AuthorDate: Sat May 4 22:34:19 2024 +0400

    Refactor cloudpickle support in Python operators/decorators (#39270)
    
    * Refactor cloudpickle support in Python operators/decorators
    
    * Fixup missing marker
    
    * Return back skip TestPythonVirtualenvOperator::test_airflow_context for 
dill
    
    * TestPythonVirtualenvOperator::test_airflow_context xfail instead of skip
    
    * Catch only on ModuleNotFound error and simple reraise with warning
    
    * Limit test_airflow_context only for python 3.11
---
 airflow/decorators/__init__.pyi                    |  61 +++-
 .../tutorial_taskflow_api_virtualenv.py            |   2 +-
 airflow/operators/python.py                        | 153 +++++---
 tests/decorators/test_branch_virtualenv.py         |   2 +-
 tests/decorators/test_external_python.py           | 227 ++++++------
 tests/decorators/test_python_virtualenv.py         | 229 ++++++------
 tests/operators/test_python.py                     | 405 ++++++++++-----------
 7 files changed, 566 insertions(+), 513 deletions(-)

diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index a10a1cf39e..e88a535db5 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -111,7 +111,7 @@ class TaskDecoratorCollection:
         # _PythonVirtualenvDecoratedOperator.
         requirements: None | Iterable[str] | str = None,
         python_version: None | str | int | float = None,
-        use_dill: bool = False,
+        serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
         system_site_packages: bool = True,
         templates_dict: Mapping[str, Any] | None = None,
         pip_install_options: list[str] | None = None,
@@ -119,6 +119,7 @@ class TaskDecoratorCollection:
         index_urls: None | Collection[str] | str = None,
         venv_cache_path: None | str = None,
         show_return_value_in_logs: bool = True,
+        use_dill: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -129,6 +130,13 @@ class TaskDecoratorCollection:
             "requirements file" as specified by pip.
         :param python_version: The Python version to run the virtual 
environment with. Note that
             both 2 and 2.7 are acceptable forms.
+        :param serializer: Which serializer use to serialize the args and 
result. It can be one of the following:
+
+            - ``"pickle"``: (default) Use pickle for serialization. Included 
in the Python Standard Library.
+            - ``"cloudpickle"``: Use cloudpickle for serialize more complex 
types,
+              this requires to include cloudpickle in your requirements.
+            - ``"dill"``: Use dill for serialize more complex types,
+              this requires to include dill in your requirements.
         :param use_dill: Whether to use dill to serialize
             the args and result (pickle is default). This allow more complex 
types
             but requires you to include dill in your requirements.
@@ -154,6 +162,9 @@ class TaskDecoratorCollection:
             logs. Defaults to True, which allows return value log output.
             It can be set to False to prevent log output of return value when 
you return huge data
             such as transmission a large amount of XCom to TaskAPI.
+        :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
+            the args and result (pickle is default). This allows more complex 
types
+            but requires you to include dill in your requirements.
         """
     @overload
     def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> 
Task[FParams, FReturn]: ...
@@ -164,9 +175,10 @@ class TaskDecoratorCollection:
         multiple_outputs: bool | None = None,
         # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
         # _PythonVirtualenvDecoratedOperator.
-        use_dill: bool = False,
+        serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
         templates_dict: Mapping[str, Any] | None = None,
         show_return_value_in_logs: bool = True,
+        use_dill: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -176,9 +188,13 @@ class TaskDecoratorCollection:
             (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
         :param multiple_outputs: If set, function return value will be 
unrolled to multiple XCom values.
             Dict will unroll to XCom values with keys as XCom keys. Defaults 
to False.
-        :param use_dill: Whether to use dill to serialize
-            the args and result (pickle is default). This allow more complex 
types
-            but requires you to include dill in your requirements.
+        :param serializer: Which serializer use to serialize the args and 
result. It can be one of the following:
+
+            - ``"pickle"``: (default) Use pickle for serialization. Included 
in the Python Standard Library.
+            - ``"cloudpickle"``: Use cloudpickle for serialize more complex 
types,
+              this requires to include cloudpickle in your requirements.
+            - ``"dill"``: Use dill for serialize more complex types,
+              this requires to include dill in your requirements.
         :param templates_dict: a dictionary where the values are templates that
             will get templated by the Airflow engine sometime between
             ``__init__`` and ``execute`` takes place and are made available
@@ -187,6 +203,9 @@ class TaskDecoratorCollection:
             logs. Defaults to True, which allows return value log output.
             It can be set to False to prevent log output of return value when 
you return huge data
             such as transmission a large amount of XCom to TaskAPI.
+        :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
+            the args and result (pickle is default). This allows more complex 
types
+            but requires you to include dill in your requirements.
         """
     @overload
     def branch(  # type: ignore[misc]
@@ -211,7 +230,7 @@ class TaskDecoratorCollection:
         # _PythonVirtualenvDecoratedOperator.
         requirements: None | Iterable[str] | str = None,
         python_version: None | str | int | float = None,
-        use_dill: bool = False,
+        serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
         system_site_packages: bool = True,
         templates_dict: Mapping[str, Any] | None = None,
         pip_install_options: list[str] | None = None,
@@ -219,6 +238,7 @@ class TaskDecoratorCollection:
         index_urls: None | Collection[str] | str = None,
         venv_cache_path: None | str = None,
         show_return_value_in_logs: bool = True,
+        use_dill: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to wrap the decorated callable into a 
BranchPythonVirtualenvOperator.
@@ -232,9 +252,13 @@ class TaskDecoratorCollection:
             "requirements file" as specified by pip.
         :param python_version: The Python version to run the virtual 
environment with. Note that
             both 2 and 2.7 are acceptable forms.
-        :param use_dill: Whether to use dill to serialize
-            the args and result (pickle is default). This allow more complex 
types
-            but requires you to include dill in your requirements.
+        :param serializer: Which serializer use to serialize the args and 
result. It can be one of the following:
+
+            - ``"pickle"``: (default) Use pickle for serialization. Included 
in the Python Standard Library.
+            - ``"cloudpickle"``: Use cloudpickle for serialize more complex 
types,
+              this requires to include cloudpickle in your requirements.
+            - ``"dill"``: Use dill for serialize more complex types,
+              this requires to include dill in your requirements.
         :param system_site_packages: Whether to include
             system_site_packages in your virtual environment.
             See virtualenv documentation for more information.
@@ -253,6 +277,9 @@ class TaskDecoratorCollection:
             logs. Defaults to True, which allows return value log output.
             It can be set to False to prevent log output of return value when 
you return huge data
             such as transmission a large amount of XCom to TaskAPI.
+        :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
+            the args and result (pickle is default). This allows more complex 
types
+            but requires you to include dill in your requirements.
         """
     @overload
     def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) 
-> Task[FParams, FReturn]: ...
@@ -264,9 +291,10 @@ class TaskDecoratorCollection:
         multiple_outputs: bool | None = None,
         # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
         # _PythonVirtualenvDecoratedOperator.
-        use_dill: bool = False,
+        serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
         templates_dict: Mapping[str, Any] | None = None,
         show_return_value_in_logs: bool = True,
+        use_dill: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to wrap the decorated callable into a 
BranchExternalPythonOperator.
@@ -279,9 +307,13 @@ class TaskDecoratorCollection:
             (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
         :param multiple_outputs: If set, function return value will be 
unrolled to multiple XCom values.
             Dict will unroll to XCom values with keys as XCom keys. Defaults 
to False.
-        :param use_dill: Whether to use dill to serialize
-            the args and result (pickle is default). This allow more complex 
types
-            but requires you to include dill in your requirements.
+        :param serializer: Which serializer use to serialize the args and 
result. It can be one of the following:
+
+            - ``"pickle"``: (default) Use pickle for serialization. Included 
in the Python Standard Library.
+            - ``"cloudpickle"``: Use cloudpickle for serialize more complex 
types,
+              this requires to include cloudpickle in your requirements.
+            - ``"dill"``: Use dill for serialize more complex types,
+              this requires to include dill in your requirements.
         :param templates_dict: a dictionary where the values are templates that
             will get templated by the Airflow engine sometime between
             ``__init__`` and ``execute`` takes place and are made available
@@ -290,6 +322,9 @@ class TaskDecoratorCollection:
             logs. Defaults to True, which allows return value log output.
             It can be set to False to prevent log output of return value when 
you return huge data
             such as transmission a large amount of XCom to TaskAPI.
+        :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
+            the args and result (pickle is default). This allows more complex 
types
+            but requires you to include dill in your requirements.
         """
     @overload
     def branch_external_python(
diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py 
b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py
index 44134e4458..3860876e6e 100644
--- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py
@@ -38,7 +38,7 @@ else:
         """
 
         @task.virtualenv(
-            use_dill=True,
+            serializer="dill",  # Use `dill` for advanced serialization.
             system_site_packages=False,
             requirements=["funcsigs"],
         )
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 2368d78d80..977ef54ecb 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -23,7 +23,6 @@ import inspect
 import json
 import logging
 import os
-import pickle
 import shutil
 import subprocess
 import sys
@@ -36,6 +35,8 @@ from pathlib import Path
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, 
Mapping, NamedTuple, Sequence, cast
 
+import lazy_object_proxy
+
 from airflow.compat.functools import cache
 from airflow.exceptions import (
     AirflowConfigException,
@@ -49,6 +50,7 @@ from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
 from airflow.models.variable import Variable
 from airflow.operators.branch import BranchMixIn
+from airflow.typing_compat import Literal
 from airflow.utils import hashlib_wrapper
 from airflow.utils.context import context_copy_partial, 
context_get_dataset_events, context_merge
 from airflow.utils.file import get_unique_dag_module_name
@@ -58,13 +60,6 @@ from airflow.utils.python_virtualenv import 
prepare_virtualenv, write_python_scr
 
 log = logging.getLogger(__name__)
 
-if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"):
-    import cloudpickle as serialization_library
-elif shutil.which("dill") or importlib.util.find_spec("dill"):
-    import dill as serialization_library
-else:
-    log.debug("Neither dill and cloudpickle are installed. Please install one 
with: pip install [name]")
-
 if TYPE_CHECKING:
     from pendulum.datetime import DateTime
 
@@ -350,6 +345,41 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
         return condition
 
 
+def _load_pickle():
+    import pickle
+
+    return pickle
+
+
+def _load_dill():
+    try:
+        import dill
+    except ModuleNotFoundError:
+        log.error("Unable to import `dill` module. Please please make sure 
that it installed.")
+        raise
+    return dill
+
+
+def _load_cloudpickle():
+    try:
+        import cloudpickle
+    except ModuleNotFoundError:
+        log.error(
+            "Unable to import `cloudpickle` module. "
+            "Please install it with: pip install 'apache-airflow[cloudpickle]'"
+        )
+        raise
+    return cloudpickle
+
+
+_SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"]
+_SERIALIZERS: dict[_SerializerTypeDef, Any] = {
+    "pickle": lazy_object_proxy.Proxy(_load_pickle),
+    "dill": lazy_object_proxy.Proxy(_load_dill),
+    "cloudpickle": lazy_object_proxy.Proxy(_load_cloudpickle),
+}
+
+
 class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
     BASE_SERIALIZABLE_CONTEXT_KEYS = {
         "ds",
@@ -400,8 +430,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         self,
         *,
         python_callable: Callable,
-        use_dill: bool = False,
-        use_cloudpickle: bool = False,
+        serializer: _SerializerTypeDef | None = None,
         op_args: Collection[Any] | None = None,
         op_kwargs: Mapping[str, Any] | None = None,
         string_args: Iterable[str] | None = None,
@@ -409,6 +438,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         templates_exts: list[str] | None = None,
         expect_airflow: bool = True,
         skip_on_exit_code: int | Container[int] | None = None,
+        use_dill: bool = False,
         **kwargs,
     ):
         if (
@@ -428,15 +458,29 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
             **kwargs,
         )
         self.string_args = string_args or []
-        if use_dill and use_cloudpickle:
-            raise AirflowException(
-                "Both 'use_dill' and 'use_cloudpickle' parameters are set to 
True. Please,"
-                " choose only one."
-            )
+
         if use_dill:
-            use_cloudpickle = use_dill
-        self.use_cloudpickle = use_cloudpickle
-        self.pickling_library = serialization_library if self.use_cloudpickle 
else pickle
+            warnings.warn(
+                "`use_dill` is deprecated and will be removed in a future 
version. "
+                "Please provide serializer='dill' instead.",
+                RemovedInAirflow3Warning,
+                stacklevel=3,
+            )
+            if serializer:
+                raise AirflowException(
+                    "Both 'use_dill' and 'serializer' parameters are set. 
Please set only one of them"
+                )
+            serializer = "dill"
+        serializer = serializer or "pickle"
+        if serializer not in _SERIALIZERS:
+            msg = (
+                f"Unsupported serializer {serializer!r}. "
+                f"Expected one of {', '.join(map(repr, _SERIALIZERS))}"
+            )
+            raise AirflowException(msg)
+        self.pickling_library = _SERIALIZERS[serializer]
+        self.serializer: _SerializerTypeDef = serializer
+
         self.expect_airflow = expect_airflow
         self.skip_on_exit_code = (
             skip_on_exit_code
@@ -461,6 +505,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
 
     def _write_args(self, file: Path):
         if self.op_args or self.op_kwargs:
+            self.log.info("Use %r as serializer.", self.serializer)
             file.write_bytes(self.pickling_library.dumps({"args": 
self.op_args, "kwargs": self.op_kwargs}))
 
     def _write_string_args(self, file: Path):
@@ -498,7 +543,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                 "op_args": self.op_args,
                 "op_kwargs": op_kwargs,
                 "expect_airflow": self.expect_airflow,
-                "pickling_library": self.pickling_library.__name__,
+                "pickling_library": self.serializer,
                 "python_callable": self.python_callable.__name__,
                 "python_callable_source": self.get_python_source(),
             }
@@ -567,12 +612,13 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         "requirements file" as specified by pip.
     :param python_version: The Python version to run the virtual environment 
with. Note that
         both 2 and 2.7 are acceptable forms.
-    :param use_dill: Whether to use dill to serialize
-        the args and result (pickle is default). This allow more complex types
-        but requires you to include dill in your requirements.
-    :param use_cloudpickle: Whether to use cloudpickle to serialize
-        the args and result (pickle is default). This allows more complex types
-        but requires you to include cloudpickle in your requirements.
+    :param serializer: Which serializer use to serialize the args and result. 
It can be one of the following:
+
+        - ``"pickle"``: (default) Use pickle for serialization. Included in 
the Python Standard Library.
+        - ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
+          this requires to include cloudpickle in your requirements.
+        - ``"dill"``: Use dill for serialize more complex types,
+          this requires to include dill in your requirements.
     :param system_site_packages: Whether to include
         system_site_packages in your virtual environment.
         See virtualenv documentation for more information.
@@ -601,6 +647,9 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         virtual environment will be cached, creates a sub-folder venv-{hash} 
whereas hash will be replaced
         with a checksum of requirements. If not provided the virtual 
environment will be created and deleted
         in a temp folder for every execution.
+    :param use_dill: Deprecated, use ``serializer`` instead. Whether to use 
dill to serialize
+        the args and result (pickle is default). This allows more complex types
+        but requires you to include dill in your requirements.
     """
 
     template_fields: Sequence[str] = tuple(
@@ -614,8 +663,7 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         python_callable: Callable,
         requirements: None | Iterable[str] | str = None,
         python_version: str | None = None,
-        use_dill: bool = False,
-        use_cloudpickle: bool = False,
+        serializer: _SerializerTypeDef | None = None,
         system_site_packages: bool = True,
         pip_install_options: list[str] | None = None,
         op_args: Collection[Any] | None = None,
@@ -627,6 +675,7 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         skip_on_exit_code: int | Container[int] | None = None,
         index_urls: None | Collection[str] | str = None,
         venv_cache_path: None | os.PathLike[str] = None,
+        use_dill: bool = False,
         **kwargs,
     ):
         if (
@@ -646,13 +695,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
                 RemovedInAirflow3Warning,
                 stacklevel=2,
             )
-        if use_dill and use_cloudpickle:
-            raise AirflowException(
-                "Both 'use_dill' and 'use_cloudpickle' parameters are set to 
True. Please, "
-                "choose only one."
-            )
-        if use_dill:
-            use_cloudpickle = use_dill
         if not is_venv_installed():
             raise AirflowException("PythonVirtualenvOperator requires 
virtualenv, please install it.")
         if not requirements:
@@ -673,7 +715,7 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         self.venv_cache_path = venv_cache_path
         super().__init__(
             python_callable=python_callable,
-            use_cloudpickle=use_cloudpickle,
+            serializer=serializer,
             op_args=op_args,
             op_kwargs=op_kwargs,
             string_args=string_args,
@@ -681,15 +723,22 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             templates_exts=templates_exts,
             expect_airflow=expect_airflow,
             skip_on_exit_code=skip_on_exit_code,
+            use_dill=use_dill,
             **kwargs,
         )
 
     def _requirements_list(self, exclude_cloudpickle: bool = False) -> 
list[str]:
         """Prepare a list of requirements that need to be installed for the 
virtual environment."""
         requirements = [str(dependency) for dependency in self.requirements]
-        if not exclude_cloudpickle:
-            if not self.system_site_packages and self.use_cloudpickle and 
"cloudpickle" not in requirements:
+        if not self.system_site_packages:
+            if (
+                self.serializer == "cloudpickle"
+                and not exclude_cloudpickle
+                and "cloudpickle" not in requirements
+            ):
                 requirements.append("cloudpickle")
+            elif self.serializer == "dill" and "dill" not in requirements:
+                requirements.append("dill")
         requirements.sort()  # Ensure a hash is stable
         return requirements
 
@@ -856,13 +905,13 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
         (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
     :param python_callable: A python function with no references to outside 
variables,
         defined with def, which will be run in a virtual environment.
-    :param use_dill: Whether to use dill to serialize
-        the args and result (pickle is default). This allow more complex types
-        but requires you to include dill in your requirements.
-    :param use_cloudpickle: Whether to use cloudpickle to serialize
-        the args and result (pickle is default). This allows more complex types
-        but if cloudpickle is not preinstalled in your virtual environment, 
the task will fail
-        with use_cloudpickle enabled.
+    :param serializer: Which serializer use to serialize the args and result. 
It can be one of the following:
+
+        - ``"pickle"``: (default) Use pickle for serialization. Included in 
the Python Standard Library.
+        - ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
+          this requires to include cloudpickle in your requirements.
+        - ``"dill"``: Use dill for serialize more complex types,
+          this requires to include dill in your requirements.
     :param op_args: A list of positional arguments to pass to python_callable.
     :param op_kwargs: A dict of keyword arguments to pass to python_callable.
     :param string_args: Strings that are present in the global var 
virtualenv_string_args,
@@ -880,6 +929,9 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
     :param skip_on_exit_code: If python_callable exits with this exit code, 
leave the task
         in ``skipped`` state (default: None). If set to ``None``, any non-zero
         exit code will be treated as a failure.
+    :param use_dill: Deprecated, use ``serializer`` instead. Whether to use 
dill to serialize
+        the args and result (pickle is default). This allows more complex types
+        but requires you to include dill in your requirements.
     """
 
     template_fields: Sequence[str] = 
tuple({"python"}.union(PythonOperator.template_fields))
@@ -889,8 +941,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
         *,
         python: str,
         python_callable: Callable,
-        use_dill: bool = False,
-        use_cloudpickle: bool = False,
+        serializer: _SerializerTypeDef | None = None,
         op_args: Collection[Any] | None = None,
         op_kwargs: Mapping[str, Any] | None = None,
         string_args: Iterable[str] | None = None,
@@ -899,21 +950,16 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
         expect_airflow: bool = True,
         expect_pendulum: bool = False,
         skip_on_exit_code: int | Container[int] | None = None,
+        use_dill: bool = False,
         **kwargs,
     ):
         if not python:
             raise ValueError("Python Path must be defined in 
ExternalPythonOperator")
-        if use_dill and use_cloudpickle:
-            raise AirflowException(
-                "Both 'use_dill' and 'use_cloudpickle' parameters are set to 
True. Please, choose only one."
-            )
-        if use_dill:
-            use_cloudpickle = use_dill
         self.python = python
         self.expect_pendulum = expect_pendulum
         super().__init__(
             python_callable=python_callable,
-            use_cloudpickle=use_cloudpickle,
+            serializer=serializer,
             op_args=op_args,
             op_kwargs=op_kwargs,
             string_args=string_args,
@@ -921,6 +967,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
             templates_exts=templates_exts,
             expect_airflow=expect_airflow,
             skip_on_exit_code=skip_on_exit_code,
+            use_dill=use_dill,
             **kwargs,
         )
 
diff --git a/tests/decorators/test_branch_virtualenv.py 
b/tests/decorators/test_branch_virtualenv.py
index 57db52f167..d38a157632 100644
--- a/tests/decorators/test_branch_virtualenv.py
+++ b/tests/decorators/test_branch_virtualenv.py
@@ -25,7 +25,7 @@ from airflow.utils.state import State
 pytestmark = pytest.mark.db_test
 
 
-class Test_BranchPythonVirtualenvDecoratedOperator:
+class TestBranchPythonVirtualenvDecoratedOperator:
     # when run in "Parallel" test run environment, sometimes this test runs 
for a long time
     # because creating virtualenv and starting new Python interpreter creates 
a lot of IO/contention
     # possibilities. So we are increasing the timeout for this test to 3x of 
the default timeout
diff --git a/tests/decorators/test_external_python.py 
b/tests/decorators/test_external_python.py
index fe5c76101c..034a51166a 100644
--- a/tests/decorators/test_external_python.py
+++ b/tests/decorators/test_external_python.py
@@ -18,21 +18,17 @@
 from __future__ import annotations
 
 import datetime
-import logging
 import subprocess
 import venv
 from datetime import timedelta
-from pathlib import Path
+from importlib.util import find_spec
 from subprocess import CalledProcessError
-from tempfile import TemporaryDirectory
 
 import pytest
 
 from airflow.decorators import setup, task, teardown
 from airflow.utils import timezone
 
-log = logging.getLogger(__name__)
-
 pytestmark = pytest.mark.db_test
 
 
@@ -40,6 +36,10 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = timedelta(hours=12)
 FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
+DILL_INSTALLED = find_spec("dill") is not None
+DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not 
installed")
+CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None
+CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, 
reason="`cloudpickle` is not installed")
 
 TI_CONTEXT_ENV_VARS = [
     "AIRFLOW_CTX_DAG_ID",
@@ -49,114 +49,75 @@ TI_CONTEXT_ENV_VARS = [
 ]
 
 
-@pytest.fixture
-def venv_python():
-    with TemporaryDirectory() as d:
-        venv.create(d, with_pip=False)
-        yield Path(d) / "bin" / "python"
+@pytest.fixture(scope="module")
+def venv_python(tmp_path_factory):
+    venv_dir = tmp_path_factory.mktemp("venv")
+    venv.create(venv_dir, with_pip=False)
+    return (venv_dir / "bin" / "python").resolve(strict=True).as_posix()
 
 
-@pytest.fixture
-def venv_python_with_cloudpickle_and_dill():
-    with TemporaryDirectory() as d:
-        venv.create(d, with_pip=True)
-        python_path = Path(d) / "bin" / "python"
-        subprocess.call([python_path, "-m", "pip", "install", "cloudpickle", 
"dill"])
-        yield python_path
+@pytest.fixture(scope="module")
+def venv_python_with_cloudpickle_and_dill(tmp_path_factory):
+    venv_dir = tmp_path_factory.mktemp("venv_serializers")
+    venv.create(venv_dir, with_pip=True)
+    python_path = (venv_dir / "bin" / "python").resolve(strict=True).as_posix()
+    subprocess.call([python_path, "-m", "pip", "install", "cloudpickle", 
"dill"])
+    return python_path
 
 
 class TestExternalPythonDecorator:
-    def test_with_cloudpickle_works(self, dag_maker, 
venv_python_with_cloudpickle_and_dill):
-        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
use_cloudpickle=True)
-        def f():
-            """Import cloudpickle to double-check it is installed ."""
-            try:
-                import cloudpickle  # noqa: F401
-            except ImportError:
-                log.warning(
-                    "Cloudpickle package is required to be installed."
-                    " Please install it with: pip install [cloudpickle]"
-                )
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_templated_python_cloudpickle(self, dag_maker, 
venv_python_with_cloudpickle_and_dill):
-        # add template that produces empty string when rendered
-        templated_python_with_cloudpickle = 
venv_python_with_cloudpickle_and_dill.as_posix() + "{{ '' }}"
-
-        @task.external_python(python=templated_python_with_cloudpickle, 
use_cloudpickle=True)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_with_serializer_works(self, serializer, dag_maker, 
venv_python_with_cloudpickle_and_dill):
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f():
-            """Import cloudpickle to double-check it is installed ."""
-            try:
-                import cloudpickle  # noqa: F401
-            except ImportError:
-                log.warning(
-                    "Cloudpickle package is required to be installed."
-                    " Please install it with: pip install [cloudpickle]"
-                )
+            """Import cloudpickle/dill to double-check it is installed ."""
+            import cloudpickle  # noqa: F401
+            import dill  # noqa: F401
 
         with dag_maker():
             ret = f()
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def 
test_no_cloudpickle_installed_raises_exception_when_use_cloudpickle(self, 
dag_maker, venv_python):
-        @task.external_python(python=venv_python, use_cloudpickle=True)
-        def f():
-            pass
-
-        with dag_maker():
-            ret = f()
-
-        with pytest.raises(CalledProcessError):
-            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_dill_works(self, dag_maker, 
venv_python_with_cloudpickle_and_dill):
-        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
use_dill=True)
-        def f():
-            """Import dill to double-check it is installed ."""
-            try:
-                import dill  # noqa: F401
-            except ImportError:
-                import logging
-
-                _log = logging.getLogger(__name__)
-                _log.warning(
-                    "Dill package is required to be installed. Please install 
it with: pip install [dill]"
-                )
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_templated_python_dill(self, dag_maker, 
venv_python_with_cloudpickle_and_dill):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_with_templated_python_serializer(
+        self, serializer, dag_maker, venv_python_with_cloudpickle_and_dill
+    ):
         # add template that produces empty string when rendered
-        templated_python_with_dill = 
venv_python_with_cloudpickle_and_dill.as_posix() + "{{ '' }}"
+        templated_python_with_cloudpickle = 
venv_python_with_cloudpickle_and_dill + "{{ '' }}"
 
-        @task.external_python(python=templated_python_with_dill, use_dill=True)
+        @task.external_python(python=templated_python_with_cloudpickle, 
serializer=serializer)
         def f():
-            """Import dill to double-check it is installed ."""
-            try:
-                import dill  # noqa: F401
-            except ImportError:
-                import logging
-
-                _log = logging.getLogger(__name__)
-                _log.warning(
-                    "Dill package is required to be installed. Please install 
it with: pip install [dill]"
-                )
+            """Import cloudpickle/dill to double-check it is installed ."""
+            import cloudpickle  # noqa: F401
+            import dill  # noqa: F401
 
         with dag_maker():
             ret = f()
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_no_dill_installed_raises_exception_when_use_dill(self, dag_maker, 
venv_python):
-        @task.external_python(python=venv_python, use_dill=True)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_no_advanced_serializer_installed(self, serializer, dag_maker, 
venv_python):
+        @task.external_python(python=venv_python, serializer=serializer)
         def f():
             pass
 
@@ -177,8 +138,17 @@ class TestExternalPythonDecorator:
         with pytest.raises(CalledProcessError):
             ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_with_args(self, dag_maker, venv_python):
-        @task.external_python(python=venv_python)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_with_args(self, serializer, dag_maker, 
venv_python_with_cloudpickle_and_dill):
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f(a, b, c=False, d=False):
             if a == 0 and b == 1 and c and not d:
                 return True
@@ -190,8 +160,17 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_return_none(self, dag_maker, venv_python):
-        @task.external_python(python=venv_python)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_return_none(self, serializer, dag_maker, 
venv_python_with_cloudpickle_and_dill):
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f():
             return None
 
@@ -200,8 +179,17 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_nonimported_as_arg(self, dag_maker, venv_python):
-        @task.external_python(python=venv_python)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_nonimported_as_arg(self, serializer, dag_maker, 
venv_python_with_cloudpickle_and_dill):
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f(_):
             return None
 
@@ -210,9 +198,20 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_marking_external_python_task_as_setup(self, dag_maker, 
venv_python):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_marking_external_python_task_as_setup(
+        self, serializer, dag_maker, venv_python_with_cloudpickle_and_dill
+    ):
         @setup
-        @task.external_python(python=venv_python)
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f():
             return 1
 
@@ -224,9 +223,20 @@ class TestExternalPythonDecorator:
         assert setup_task.is_setup
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_marking_external_python_task_as_teardown(self, dag_maker, 
venv_python):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_marking_external_python_task_as_teardown(
+        self, serializer, dag_maker, venv_python_with_cloudpickle_and_dill
+    ):
         @teardown
-        @task.external_python(python=venv_python)
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f():
             return 1
 
@@ -238,12 +248,21 @@ class TestExternalPythonDecorator:
         assert teardown_task.is_teardown
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
     @pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
     def test_marking_external_python_task_as_teardown_with_on_failure_fail(
-        self, dag_maker, on_failure_fail_dagrun, venv_python
+        self, serializer, dag_maker, on_failure_fail_dagrun, 
venv_python_with_cloudpickle_and_dill
     ):
         @teardown(on_failure_fail_dagrun=on_failure_fail_dagrun)
-        @task.external_python(python=venv_python)
+        @task.external_python(python=venv_python_with_cloudpickle_and_dill, 
serializer=serializer)
         def f():
             return 1
 
diff --git a/tests/decorators/test_python_virtualenv.py 
b/tests/decorators/test_python_virtualenv.py
index 15f37d9a46..b91bcaae36 100644
--- a/tests/decorators/test_python_virtualenv.py
+++ b/tests/decorators/test_python_virtualenv.py
@@ -18,26 +18,30 @@
 from __future__ import annotations
 
 import datetime
-import logging
 import sys
+from importlib.util import find_spec
 from subprocess import CalledProcessError
 
 import pytest
 
 from airflow.decorators import setup, task, teardown
+from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.utils import timezone
 
-log = logging.getLogger(__name__)
-
 pytestmark = pytest.mark.db_test
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}"
+DILL_INSTALLED = find_spec("dill") is not None
+DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not 
installed")
+CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None
+CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, 
reason="`cloudpickle` is not installed")
 
 
 class TestPythonVirtualenvDecorator:
+    @CLOUDPICKLE_MARKER
     def test_add_cloudpickle(self, dag_maker):
-        @task.virtualenv(use_cloudpickle=True, system_site_packages=False)
+        @task.virtualenv(serializer="cloudpickle", system_site_packages=False)
         def f():
             """Ensure cloudpickle is correctly installed."""
             import cloudpickle  # noqa: F401
@@ -47,25 +51,31 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    @DILL_MARKER
     def test_add_dill(self, dag_maker):
-        @task.virtualenv(use_dill=True, system_site_packages=False)
+        @task.virtualenv(serializer="dill", system_site_packages=False)
         def f():
             """Ensure dill is correctly installed."""
-            try:
-                import dill  # noqa: F401
-            except ImportError:
-                import logging
-
-                _log = logging.getLogger(__name__)
-                _log.warning(
-                    "Dill package is required to be installed. Please install 
it with: pip install [dill]"
-                )
+            import dill  # noqa: F401
 
         with dag_maker():
             ret = f()
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    @DILL_MARKER
+    def test_add_dill_use_dill(self, dag_maker):
+        @task.virtualenv(use_dill=True, system_site_packages=False)
+        def f():
+            """Ensure dill is correctly installed."""
+            import dill  # noqa: F401
+
+        with pytest.warns(RemovedInAirflow3Warning, match="`use_dill` is 
deprecated and will be removed"):
+            with dag_maker():
+                ret = f()
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
     def test_no_requirements(self, dag_maker):
         """Tests that the python callable is invoked on task run."""
 
@@ -78,8 +88,15 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_no_system_site_packages(self, dag_maker):
-        @task.virtualenv(system_site_packages=False, 
python_version=PYTHON_VERSION, use_cloudpickle=True)
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_no_system_site_packages(self, serializer, dag_maker):
+        @task.virtualenv(system_site_packages=False, 
python_version=PYTHON_VERSION, serializer=serializer)
         def f():
             try:
                 import funcsigs  # noqa: F401
@@ -92,12 +109,19 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_system_site_packages_cloudpickle(self, dag_maker):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_system_site_packages(self, serializer, dag_maker):
         @task.virtualenv(
             system_site_packages=False,
             requirements=["funcsigs"],
             python_version=PYTHON_VERSION,
-            use_cloudpickle=True,
+            serializer=serializer,
         )
         def f():
             import funcsigs  # noqa: F401
@@ -107,27 +131,21 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_system_site_packages_dill(self, dag_maker):
-        @task.virtualenv(
-            system_site_packages=False,
-            requirements=["funcsigs"],
-            python_version=PYTHON_VERSION,
-            use_dill=True,
-        )
-        def f():
-            import funcsigs  # noqa: F401
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_requirements_pinned_cloudpickle(self, dag_maker):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_with_requirements_pinned(self, serializer, dag_maker):
         @task.virtualenv(
             system_site_packages=False,
             requirements=["funcsigs==0.4"],
             python_version=PYTHON_VERSION,
-            use_cloudpickle=True,
+            serializer=serializer,
         )
         def f():
             import funcsigs
@@ -140,25 +158,16 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_with_requirements_pinned_dill(self, dag_maker):
-        @task.virtualenv(
-            system_site_packages=False,
-            requirements=["funcsigs==0.4"],
-            python_version=PYTHON_VERSION,
-            use_dill=True,
-        )
-        def f():
-            import funcsigs
-
-            if funcsigs.__version__ != "0.4":
-                raise Exception
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_requirements_file_cloudpickle(self, dag_maker, tmp_path):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_with_requirements_file(self, serializer, dag_maker, tmp_path):
         requirements_file = tmp_path / "requirements.txt"
         requirements_file.write_text("funcsigs==0.4\nattrs==23.1.0")
 
@@ -166,7 +175,7 @@ class TestPythonVirtualenvDecorator:
             system_site_packages=False,
             requirements="requirements.txt",
             python_version=PYTHON_VERSION,
-            use_cloudpickle=True,
+            serializer=serializer,
         )
         def f():
             import funcsigs
@@ -184,53 +193,21 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_with_requirements_file_dill(self, dag_maker, tmp_path):
-        requirements_file = tmp_path / "requirements.txt"
-        requirements_file.write_text("funcsigs==0.4\nattrs==23.1.0")
-
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_unpinned_requirements(self, serializer, extra_requirements, 
dag_maker):
         @task.virtualenv(
             system_site_packages=False,
-            requirements="requirements.txt",
+            requirements=["funcsigs", *extra_requirements],
             python_version=PYTHON_VERSION,
-            use_dill=True,
-        )
-        def f():
-            import funcsigs
-
-            if funcsigs.__version__ != "0.4":
-                raise Exception
-
-            import attrs
-
-            if attrs.__version__ != "23.1.0":
-                raise Exception
-
-        with dag_maker(template_searchpath=tmp_path.as_posix()):
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_unpinned_requirements_cloudpickle(self, dag_maker):
-        @task.virtualenv(
-            system_site_packages=False,
-            requirements=["funcsigs", "cloudpickle"],
-            python_version=PYTHON_VERSION,
-            use_cloudpickle=True,
-        )
-        def f():
-            import funcsigs  # noqa: F401
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_unpinned_requirements_dill(self, dag_maker):
-        @task.virtualenv(
-            system_site_packages=False,
-            requirements=["funcsigs", "dill"],
-            python_version=PYTHON_VERSION,
-            use_dill=True,
+            serializer=serializer,
         )
         def f():
             import funcsigs  # noqa: F401
@@ -240,7 +217,16 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_fail(self, dag_maker):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_fail(self, serializer, dag_maker):
         @task.virtualenv()
         def f():
             raise Exception
@@ -251,8 +237,17 @@ class TestPythonVirtualenvDecorator:
         with pytest.raises(CalledProcessError):
             ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_python_3_cloudpickle(self, dag_maker):
-        @task.virtualenv(python_version="3", use_cloudpickle=False, 
requirements=["cloudpickle"])
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_python_3(self, serializer, extra_requirements, dag_maker):
+        @task.virtualenv(python_version="3", serializer=serializer, 
requirements=extra_requirements)
         def f():
             import sys
 
@@ -268,25 +263,17 @@ class TestPythonVirtualenvDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_python_3_dill(self, dag_maker):
-        @task.virtualenv(python_version="3", use_dill=False, 
requirements=["dill"])
-        def f():
-            import sys
-
-            print(sys.version)
-            try:
-                {}.iteritems()
-            except AttributeError:
-                return
-            raise Exception
-
-        with dag_maker():
-            ret = f()
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_with_args(self, dag_maker):
-        @task.virtualenv
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_with_args(self, serializer, extra_requirements, dag_maker):
+        @task.virtualenv(serializer=serializer, 
requirements=extra_requirements)
         def f(a, b, c=False, d=False):
             if a == 0 and b == 1 and c and not d:
                 return True
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 8a31043fd8..2ab71be86e 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -28,6 +28,7 @@ import warnings
 from collections import namedtuple
 from datetime import date, datetime, timedelta, timezone as _timezone
 from functools import partial
+from importlib.util import find_spec
 from subprocess import CalledProcessError
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Generator
@@ -65,20 +66,21 @@ from airflow.utils.types import NOTSET, DagRunType
 from tests.test_utils import AIRFLOW_MAIN_FOLDER
 from tests.test_utils.db import clear_db_runs
 
-log = logging.getLogger(__name__)
+if TYPE_CHECKING:
+    from airflow.models.dagrun import DagRun
 
 pytestmark = pytest.mark.db_test
 
 
-if TYPE_CHECKING:
-    from airflow.models.dagrun import DagRun
-
 TI = TaskInstance
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEMPLATE_SEARCHPATH = os.path.join(AIRFLOW_MAIN_FOLDER, "tests", 
"config_templates")
 LOGGER_NAME = "airflow.task.operators"
 DEFAULT_PYTHON_VERSION = f"{sys.version_info[0]}.{sys.version_info[1]}"
-PY311 = sys.version_info >= (3, 11)
+DILL_INSTALLED = find_spec("dill") is not None
+DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not 
installed")
+CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None
+CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, 
reason="`cloudpickle` is not installed")
 
 
 class BasePythonTest:
@@ -815,14 +817,23 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         task = self.run_as_task(f, templates_dict={"ds": "{{ ds }}"})
         assert task.templates_dict == {"ds": self.ds_templated}
 
-    def test_deepcopy(self):
-        """Test that PythonVirtualenvOperator are deep-copyable."""
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_deepcopy(self, serializer):
+        """Test that operator are deep-copyable."""
 
         def f():
             return 1
 
-        task = PythonVirtualenvOperator(python_callable=f, task_id="task")
-        copy.deepcopy(task)
+        op = self.opcls(task_id="task", python_callable=f, 
**self.default_kwargs())
+        copy.deepcopy(op)
 
     def test_virtualenv_serializable_context_fields(self, 
create_task_instance):
         """Ensure all template context fields are listed in the operator.
@@ -892,6 +903,34 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
             )
             assert ti.state == expected_state
 
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param(
+                "dill",
+                marks=pytest.mark.skipif(
+                    DILL_INSTALLED, reason="For this test case `dill` 
shouldn't be installed"
+                ),
+                id="dill",
+            ),
+            pytest.param(
+                "cloudpickle",
+                marks=pytest.mark.skipif(
+                    CLOUDPICKLE_INSTALLED, reason="For this test case 
`cloudpickle` shouldn't be installed"
+                ),
+                id="cloudpickle",
+            ),
+        ],
+    )
+    def test_advanced_serializer_not_installed(self, serializer, caplog):
+        """Test case for check raising an error if dill/cloudpickle is not 
installed."""
+
+        def f(a): ...
+
+        with pytest.raises(ModuleNotFoundError):
+            self.run_as_task(f, op_args=[42], serializer=serializer)
+        assert f"Unable to import `{serializer}` module." in caplog.text
+
 
 venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path")
 
@@ -927,56 +966,65 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         with pytest.raises(AirflowException, match="requires virtualenv"):
             self.run_as_task(f)
 
+    @CLOUDPICKLE_MARKER
     def test_add_cloudpickle(self):
         def f():
             """Ensure cloudpickle is correctly installed."""
-            try:
-                import cloudpickle  # noqa: F401
-            except ImportError:
-                import logging
-
-                _log = logging.getLogger(__name__)
-                _log.warning(
-                    "Cloudpickle package is required to be installed."
-                    " Please install it with: pip install [cloudpickle]"
-                )
+            import cloudpickle  # noqa: F401
 
-        self.run_as_task(f, use_cloudpickle=True, system_site_packages=False)
+        self.run_as_task(f, serializer="cloudpickle", 
system_site_packages=False)
 
+    @DILL_MARKER
     def test_add_dill(self):
         def f():
             """Ensure dill is correctly installed."""
-            try:
-                import dill  # noqa: F401
-            except ImportError:
-                import logging
+            import dill  # noqa: F401
 
-                _log = logging.getLogger(__name__)
-                _log.warning(
-                    "Dill package is required to be installed. Please install 
it with: pip install [dill]"
-                )
+        self.run_as_task(f, serializer="dill", system_site_packages=False)
 
-        self.run_as_task(f, use_dill=True, system_site_packages=False)
+    @DILL_MARKER
+    def test_add_dill_use_dill(self):
+        def f():
+            """Ensure dill is correctly installed."""
+            import dill  # noqa: F401
 
-    def test_no_requirements(self):
-        """Tests that the python callable is invoked on task run."""
+        with pytest.warns(RemovedInAirflow3Warning, match="`use_dill` is 
deprecated and will be removed"):
+            self.run_as_task(f, use_dill=True, system_site_packages=False)
 
+    def test_ambiguous_serializer(self):
         def f():
             pass
 
-        self.run_as_task(f)
+        with pytest.warns(RemovedInAirflow3Warning, match="`use_dill` is 
deprecated and will be removed"):
+            with pytest.raises(AirflowException, match="Both 'use_dill' and 
'serializer' parameters are set"):
+                self.run_as_task(f, use_dill=True, serializer="dill")
 
-    def test_no_system_site_packages_cloudpickle(self):
+    def test_invalid_serializer(self):
         def f():
-            try:
-                import funcsigs  # noqa: F401
-            except ImportError:
-                return True
-            raise RuntimeError
+            """Ensure dill is correctly installed."""
+            import dill  # noqa: F401
+
+        with pytest.raises(AirflowException, match="Unsupported serializer 
'airflow'"):
+            self.run_as_task(f, serializer="airflow")
 
-        self.run_as_task(f, system_site_packages=False, 
requirements=["cloudpickle"])
+    def test_no_requirements(self):
+        """Tests that the python callable is invoked on task run."""
 
-    def test_no_system_site_packages_dill(self):
+        def f():
+            pass
+
+        self.run_as_task(f)
+
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_no_system_site_packages(self, serializer, extra_requirements):
         def f():
             try:
                 import funcsigs  # noqa: F401
@@ -984,7 +1032,7 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
                 return True
             raise RuntimeError
 
-        self.run_as_task(f, system_site_packages=False, requirements=["dill"])
+        self.run_as_task(f, system_site_packages=False, 
requirements=extra_requirements)
 
     def test_system_site_packages(self):
         def f():
@@ -1017,29 +1065,35 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
 
         self.run_as_task(f, requirements=["funcsigs==0.4"], 
do_not_use_caching=True)
 
-    def test_unpinned_requirements_cloudpickle(self):
-        def f():
-            import funcsigs  # noqa: F401
-
-        self.run_as_task(f, requirements=["funcsigs", "cloudpickle"], 
system_site_packages=False)
-
-    def test_unpinned_requirements_dill(self):
-        def f():
-            import funcsigs  # noqa: F401
-
-        self.run_as_task(f, requirements=["funcsigs", "dill"], 
system_site_packages=False)
-
-    def test_range_requirements_cloudpickle(self):
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_unpinned_requirements(self, serializer, extra_requirements):
         def f():
             import funcsigs  # noqa: F401
 
-        self.run_as_task(f, requirements=["funcsigs>1.0", "cloudpickle"], 
system_site_packages=False)
+        self.run_as_task(f, requirements=["funcsigs", *extra_requirements], 
system_site_packages=False)
 
-    def test_range_requirements_dill(self):
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_range_requirements(self, serializer, extra_requirements):
         def f():
             import funcsigs  # noqa: F401
 
-        self.run_as_task(f, requirements=["funcsigs>1.0", "dill"], 
system_site_packages=False)
+        self.run_as_task(f, requirements=["funcsigs>1.0", 
*extra_requirements], system_site_packages=False)
 
     def test_requirements_file(self):
         def f():
@@ -1069,21 +1123,16 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
             pip_install_options=["--no-deps"],
         )
 
-    def test_templated_requirements_file_cloudpickle(self):
-        def f():
-            import funcsigs
-
-            assert funcsigs.__version__ == "1.0.2"
-
-        self.run_as_operator(
-            f,
-            requirements="requirements.txt",
-            use_cloudpickle=True,
-            params={"environ": "templated_unit_test"},
-            system_site_packages=False,
-        )
-
-    def test_templated_requirements_file_dill(self):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_templated_requirements_file(self, serializer):
         def f():
             import funcsigs
 
@@ -1092,25 +1141,21 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         self.run_as_operator(
             f,
             requirements="requirements.txt",
-            use_dill=True,
+            serializer=serializer,
             params={"environ": "templated_unit_test"},
             system_site_packages=False,
         )
 
-    def test_python_3_cloudpickle(self):
-        def f():
-            import sys
-
-            print(sys.version)
-            try:
-                {}.iteritems()
-            except AttributeError:
-                return
-            raise RuntimeError
-
-        self.run_as_task(f, python_version="3", use_cloudpickle=False, 
requirements=["cloudpickle"])
-
-    def test_python_3_dill(self):
+    @pytest.mark.parametrize(
+        "serializer, extra_requirements",
+        [
+            pytest.param("pickle", [], id="pickle"),
+            pytest.param("dill", ["dill"], marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", ["cloudpickle"], 
marks=CLOUDPICKLE_MARKER, id="cloudpickle"),
+            pytest.param(None, [], id="default"),
+        ],
+    )
+    def test_python_3_serializers(self, serializer, extra_requirements):
         def f():
             import sys
 
@@ -1121,19 +1166,16 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
                 return
             raise RuntimeError
 
-        self.run_as_task(f, python_version="3", use_dill=False, 
requirements=["dill"])
-
-    def test_without_cloudpickle(self):
-        def f(a):
-            return a
-
-        self.run_as_task(f, system_site_packages=False, use_cloudpickle=False, 
op_args=[4])
+        with pytest.warns(
+            RemovedInAirflow3Warning, match="Passing non-string 
types.*python_version is deprecated"
+        ):
+            self.run_as_task(f, python_version=3, serializer=serializer, 
requirements=extra_requirements)
 
-    def test_without_dill(self):
+    def test_with_default(self):
         def f(a):
             return a
 
-        self.run_as_task(f, system_site_packages=False, use_dill=False, 
op_args=[4])
+        self.run_as_task(f, system_site_packages=False, op_args=[4])
 
     def test_with_index_urls(self):
         def f(a):
@@ -1158,67 +1200,39 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
             self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])
 
     # This tests might take longer than default 60 seconds as it is 
serializing a lot of
-    # context using cloudpickle (which is slow apparently).
+    # context using dill/cloudpickle (which is slow apparently).
     @pytest.mark.execution_timeout(120)
     
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    @pytest.mark.skipif(
-        os.environ.get("PYTEST_PLAIN_ASSERTS") != "true",
-        reason="assertion rewriting breaks this test because cloudpickle will 
try to serialize "
-        "AssertRewritingHook including captured stdout and we need to run "
-        "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true 
.",
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param(
+                "dill",
+                marks=[
+                    DILL_MARKER,
+                    pytest.mark.xfail(
+                        sys.version_info[:2] == (3, 11),
+                        reason=(
+                            "Also this test is failed on Python 3.11 because 
of impact of "
+                            "regression in Python 3.11 connected likely with 
CodeType behaviour "
+                            "https://github.com/python/cpython/issues/100316. "
+                            "That likely causes that dill is not able to 
serialize the `conf` correctly. "
+                            "Issue about fixing it is captured in 
https://github.com/apache/airflow/issues/35307";
+                        ),
+                    ),
+                ],
+                id="dill",
+            ),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
     )
-    def test_airflow_context(self):
-        def f(
-            # basic
-            ds_nodash,
-            inlets,
-            next_ds,
-            next_ds_nodash,
-            outlets,
-            params,
-            prev_ds,
-            prev_ds_nodash,
-            run_id,
-            task_instance_key_str,
-            test_mode,
-            tomorrow_ds,
-            tomorrow_ds_nodash,
-            ts,
-            ts_nodash,
-            ts_nodash_with_tz,
-            yesterday_ds,
-            yesterday_ds_nodash,
-            # pendulum-specific
-            execution_date,
-            next_execution_date,
-            prev_execution_date,
-            prev_execution_date_success,
-            prev_start_date_success,
-            prev_end_date_success,
-            # airflow-specific
-            macros,
-            conf,
-            dag,
-            dag_run,
-            task,
-            # other
-            **context,
-        ):
-            pass
-
-        self.run_as_operator(f, use_cloudpickle=True, 
system_site_packages=True, requirements=None)
-
-    # This tests might take longer than default 60 seconds as it is 
serializing a lot of
-    # context using dill (which is slow apparently).
-    @pytest.mark.execution_timeout(120)
-    
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
     @pytest.mark.skipif(
         os.environ.get("PYTEST_PLAIN_ASSERTS") != "true",
-        reason="assertion rewriting breaks this test because dill will try to 
serialize "
+        reason="assertion rewriting breaks this test because serializer will 
try to serialize "
         "AssertRewritingHook including captured stdout and we need to run "
-        "it with `--assert=plain`pytest option and PYTEST_PLAIN_ASSERTS=true 
.",
+        "it with `--assert=plain` pytest option and PYTEST_PLAIN_ASSERTS=true 
.",
     )
-    def test_airflow_context_dill(self):
+    def test_airflow_context(self, serializer):
         def f(
             # basic
             ds_nodash,
@@ -1257,45 +1271,17 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         ):
             pass
 
-        self.run_as_operator(f, use_dill=True, system_site_packages=True, 
requirements=None)
+        self.run_as_operator(f, serializer=serializer, 
system_site_packages=True, requirements=None)
 
     
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_pendulum_context(self):
-        def f(
-            # basic
-            ds_nodash,
-            inlets,
-            next_ds,
-            next_ds_nodash,
-            outlets,
-            prev_ds,
-            prev_ds_nodash,
-            run_id,
-            task_instance_key_str,
-            test_mode,
-            tomorrow_ds,
-            tomorrow_ds_nodash,
-            ts,
-            ts_nodash,
-            ts_nodash_with_tz,
-            yesterday_ds,
-            yesterday_ds_nodash,
-            # pendulum-specific
-            execution_date,
-            next_execution_date,
-            prev_execution_date,
-            prev_execution_date_success,
-            prev_start_date_success,
-            prev_end_date_success,
-            # other
-            **context,
-        ):
-            pass
-
-        self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, 
requirements=["pendulum"])
-
-    
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_pendulum_context_dill(self):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+        ],
+    )
+    def test_pendulum_context(self, serializer):
         def f(
             # basic
             ds_nodash,
@@ -1327,38 +1313,19 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         ):
             pass
 
-        self.run_as_task(f, use_dill=True, system_site_packages=False, 
requirements=["pendulum"])
-
-    
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_base_context(self):
-        def f(
-            # basic
-            ds_nodash,
-            inlets,
-            next_ds,
-            next_ds_nodash,
-            outlets,
-            prev_ds,
-            prev_ds_nodash,
-            run_id,
-            task_instance_key_str,
-            test_mode,
-            tomorrow_ds,
-            tomorrow_ds_nodash,
-            ts,
-            ts_nodash,
-            ts_nodash_with_tz,
-            yesterday_ds,
-            yesterday_ds_nodash,
-            # other
-            **context,
-        ):
-            pass
-
-        self.run_as_task(f, use_cloudpickle=True, system_site_packages=False, 
requirements=None)
+        self.run_as_task(f, serializer=serializer, system_site_packages=False, 
requirements=["pendulum"])
 
     
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_base_context_dill(self):
+    @pytest.mark.parametrize(
+        "serializer",
+        [
+            pytest.param("pickle", id="pickle"),
+            pytest.param("dill", marks=DILL_MARKER, id="dill"),
+            pytest.param("cloudpickle", marks=CLOUDPICKLE_MARKER, 
id="cloudpickle"),
+            pytest.param(None, id="default"),
+        ],
+    )
+    def test_base_context(self, serializer):
         def f(
             # basic
             ds_nodash,
@@ -1383,7 +1350,7 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         ):
             pass
 
-        self.run_as_task(f, use_dill=True, system_site_packages=False, 
requirements=None)
+        self.run_as_task(f, serializer=serializer, system_site_packages=False, 
requirements=None)
 
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
@@ -1629,8 +1596,6 @@ class 
BaseTestBranchPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers
 # therefore we have to extend timeouts for those tests
-
-
 @pytest.mark.execution_timeout(120)
 @pytest.mark.virtualenv_operator
 class 
TestBranchPythonVirtualenvOperator(BaseTestBranchPythonVirtualenvOperator):

Reply via email to