This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9c833bf7018 Update DockerSwarmOperator auto_remove to align with
DockerOperator (#45745)
9c833bf7018 is described below
commit 9c833bf7018ffabe32b4c0d563114f3f6f3b4c1d
Author: Niklas Rousset <[email protected]>
AuthorDate: Mon Feb 10 11:52:05 2025 +0100
Update DockerSwarmOperator auto_remove to align with DockerOperator (#45745)
* Update DockerSwarmOperator auto_remove to align with DockerOperator
* add docker swarm auto remove test
---
.../providers/docker/operators/docker_swarm.py | 16 ++++-----
.../docker/operators/test_docker_swarm.py | 38 ++++++++++++++++++++--
2 files changed, 44 insertions(+), 10 deletions(-)
diff --git
a/providers/docker/src/airflow/providers/docker/operators/docker_swarm.py
b/providers/docker/src/airflow/providers/docker/operators/docker_swarm.py
index e899b20f648..200c03b1276 100644
--- a/providers/docker/src/airflow/providers/docker/operators/docker_swarm.py
+++ b/providers/docker/src/airflow/providers/docker/operators/docker_swarm.py
@@ -59,9 +59,11 @@ class DockerSwarmOperator(DockerOperator):
If image tag is omitted, "latest" will be used.
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
- :param auto_remove: Auto-removal of the container on daemon side when the
- container's process exits.
- The default is False.
+ :param auto_remove: Enable removal of the service when the service has
terminated. Possible values:
+
+ - ``never``: (default) do not remove service
+ - ``success``: remove on success
+ - ``force``: always remove service
:param command: Command to be run in the container. (templated)
:param args: Arguments to the command.
:param docker_url: URL of the host running the docker daemon.
@@ -214,18 +216,16 @@ class DockerSwarmOperator(DockerOperator):
container_id = task["Status"]["ContainerStatus"]["ContainerID"]
container = self.cli.inspect_container(container_id)
self.containers.append(container)
- else:
- raise AirflowException(f"Service did not complete:
{self.service!r}")
if self.retrieve_output:
return self._attempt_to_retrieve_results()
- self.log.info("auto_removeauto_removeauto_removeauto_removeauto_remove
: %s", str(self.auto_remove))
+ self.log.info("auto_remove: %s", str(self.auto_remove))
if self.service and self._service_status() != "complete":
- if self.auto_remove == "success":
+ if self.auto_remove == "force":
self.cli.remove_service(self.service["ID"])
raise AirflowException(f"Service did not complete:
{self.service!r}")
- elif self.auto_remove == "success":
+ elif self.auto_remove in ["success", "force"]:
if not self.service:
raise RuntimeError("The 'service' should be initialized
before!")
self.cli.remove_service(self.service["ID"])
diff --git
a/providers/docker/tests/provider_tests/docker/operators/test_docker_swarm.py
b/providers/docker/tests/provider_tests/docker/operators/test_docker_swarm.py
index 67976ccf4ac..0887785739b 100644
---
a/providers/docker/tests/provider_tests/docker/operators/test_docker_swarm.py
+++
b/providers/docker/tests/provider_tests/docker/operators/test_docker_swarm.py
@@ -130,7 +130,8 @@ class TestDockerSwarmOperator:
client_mock.remove_service.assert_called_once_with("some_id")
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
- def test_auto_remove(self, types_mock, docker_api_client_patcher):
+ @pytest.mark.parametrize("auto_remove", ["success", "force"])
+ def test_auto_remove(self, types_mock, docker_api_client_patcher,
auto_remove):
mock_obj = mock.Mock()
client_mock = mock.Mock(spec=APIClient)
@@ -148,12 +149,45 @@ class TestDockerSwarmOperator:
docker_api_client_patcher.return_value = client_mock
operator = DockerSwarmOperator(
- image="", auto_remove="success", task_id="unittest",
enable_logging=False
+ image="", auto_remove=auto_remove, task_id="unittest",
enable_logging=False
)
operator.execute(None)
client_mock.remove_service.assert_called_once_with("some_id")
+ @mock.patch("airflow.providers.docker.operators.docker_swarm.types")
+ @pytest.mark.parametrize(
+ "auto_remove,expected_remove_call", [("success", False), ("force",
True), ("never", False)]
+ )
+ def test_auto_remove_failed(
+ self, types_mock, docker_api_client_patcher, auto_remove,
expected_remove_call
+ ):
+ mock_obj = mock.Mock()
+
+ client_mock = mock.Mock(spec=APIClient)
+ client_mock.create_service.return_value = {"ID": "some_id"}
+ client_mock.images.return_value = []
+ client_mock.pull.return_value = [b'{"status":"pull log"}']
+ client_mock.tasks.return_value = [
+ {"Status": {"State": "failed", "ContainerStatus": {"ContainerID":
"some_id"}}}
+ ]
+ types_mock.TaskTemplate.return_value = mock_obj
+ types_mock.ContainerSpec.return_value = mock_obj
+ types_mock.RestartPolicy.return_value = mock_obj
+ types_mock.Resources.return_value = mock_obj
+
+ docker_api_client_patcher.return_value = client_mock
+
+ operator = DockerSwarmOperator(
+ image="", auto_remove=auto_remove, task_id="unittest",
enable_logging=False
+ )
+ try:
+ operator.execute(None)
+ except AirflowException:
+ pass
+
+ assert (client_mock.remove_service.call_count > 0) ==
expected_remove_call
+
@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
def test_no_auto_remove(self, types_mock, docker_api_client_patcher):
mock_obj = mock.Mock()