This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e7166bb759 Adds support for capacity providers to ECS Executor (#36722)
e7166bb759 is described below

commit e7166bb7594086ace1c8e34f358e236ab0dcd2b7
Author: D. Ferruzzi <ferru...@amazon.com>
AuthorDate: Thu Jan 11 11:55:22 2024 -0800

    Adds support for capacity providers to ECS Executor (#36722)
    
    Add support for Capacity Providers
---
 .../aws/executors/ecs/ecs_executor_config.py       | 17 ++++++
 .../providers/amazon/aws/executors/ecs/utils.py    |  2 +-
 airflow/providers/amazon/provider.yaml             | 19 ++++++-
 .../amazon/aws/executors/ecs/test_ecs_executor.py  | 65 +++++++++++++++++++++-
 4 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py 
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py
index f0fa97852a..4f57b72d96 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py
@@ -40,6 +40,7 @@ from airflow.providers.amazon.aws.executors.ecs.utils import (
     camelize_dict_keys,
     parse_assign_public_ip,
 )
+from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.utils.helpers import prune_dict
 
 
@@ -60,6 +61,22 @@ def build_task_kwargs() -> dict:
     task_kwargs = _fetch_config_values()
     task_kwargs.update(_fetch_templated_kwargs())
 
+    has_launch_type: bool = "launch_type" in task_kwargs
+    has_capacity_provider: bool = "capacity_provider_strategy" in task_kwargs
+
+    if has_capacity_provider and has_launch_type:
+        raise ValueError(
+            "capacity_provider_strategy and launch_type are mutually 
exclusive, you can not provide both."
+        )
+    elif "cluster" in task_kwargs and not (has_capacity_provider or 
has_launch_type):
+        # Default API behavior if neither is provided is to fall back on the 
default capacity
+        # provider if it exists. Since it is not a required value, check if 
there is one
+        # before using it, and if there is not then use the FARGATE 
launch_type as
+        # the final fallback.
+        cluster = 
EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0]
+        if not cluster.get("defaultCapacityProviderStrategy"):
+            task_kwargs["launch_type"] = "FARGATE"
+
     # There can only be 1 count of these containers
     task_kwargs["count"] = 1  # type: ignore
     # There could be a generic approach to the below, but likely more 
convoluted then just manually ensuring
diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py 
b/airflow/providers/amazon/aws/executors/ecs/utils.py
index 4966fa3d2b..7913bdf227 100644
--- a/airflow/providers/amazon/aws/executors/ecs/utils.py
+++ b/airflow/providers/amazon/aws/executors/ecs/utils.py
@@ -44,7 +44,6 @@ CONFIG_DEFAULTS = {
     "conn_id": "aws_default",
     "max_run_task_attempts": "3",
     "assign_public_ip": "False",
-    "launch_type": "FARGATE",
     "platform_version": "LATEST",
     "check_health_on_startup": "True",
 }
@@ -81,6 +80,7 @@ class RunTaskKwargsConfigKeys(BaseConfigKeys):
     """Keys loaded into the config which are valid ECS run_task kwargs."""
 
     ASSIGN_PUBLIC_IP = "assign_public_ip"
+    CAPACITY_PROVIDER_STRATEGY = "capacity_provider_strategy"
     CLUSTER = "cluster"
     LAUNCH_TYPE = "launch_type"
     PLATFORM_VERSION = "platform_version"
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index c8b1e8e50e..4cd8a1278a 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -838,6 +838,19 @@ config:
         type: string
         example: "ecs_executor_cluster"
         default: ~
+      capacity_provider_strategy:
+        description: |
+          The capacity provider strategy to use for the task.
+
+          If a Capacity Provider Strategy is specified, the Launch Type 
parameter must be omitted. If
+          no Capacity Provider Strategy or Launch Type is specified, the 
Default CapacityProvider Strategy
+          for the cluster is used, if present.
+
+          When you use cluster auto scaling, you must specify Capacity 
Provider Strategy and not Launch Type.
+        version_added: "8.17"
+        type: string
+        example: "[{'capacityProvider': 'cp1', 'weight': 5}, 
{'capacityProvider': 'cp2', 'weight': 1}]"
+        default: ~
       container_name:
         description: |
           Name of the container that will be used to execute Airflow tasks via 
the ECS executor.
@@ -853,6 +866,10 @@ config:
           Launch type can either be 'FARGATE' OR 'EC2'. For more info see url 
to
           Boto3 docs above.
 
+          If a Launch Type is specified, the Capacity Provider Strategy 
parameter must be omitted. If
+          no Capacity Provider Strategy or Launch Type is specified, the 
Default Capacity Provider Strategy
+          for the cluster is used, if present.
+
           If the launch type is EC2, the executor will attempt to place tasks 
on
           empty EC2 instances. If there are no EC2 instances available, no task
           is placed and this function will be called again in the next 
heart-beat.
@@ -862,7 +879,7 @@ config:
         version_added: "8.10"
         type: string
         example: "FARGATE"
-        default: "FARGATE"
+        default: ~
       platform_version:
         description: |
           The platform version the task uses. A platform version is only 
specified
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py 
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 600f2597d3..78c3c1bc28 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -47,6 +47,7 @@ from airflow.providers.amazon.aws.executors.ecs.utils import (
     _recursive_flatten_dict,
     parse_assign_public_ip,
 )
+from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.state import State, TaskInstanceState
 
@@ -943,9 +944,9 @@ class TestEcsExecutorConfig:
 
         assert task_kwargs["platformVersion"] == templated_version
 
-    def test_count_can_not_be_modified_by_the_user(self, assign_subnets):
+    @mock.patch.object(EcsHook, "conn")
+    def test_count_can_not_be_modified_by_the_user(self, _, assign_subnets):
         """The ``count`` parameter must always be 1; verify that the user can 
not override this value."""
-
         templated_version = "1"
         templated_cluster = "templated_cluster_name"
         provided_run_task_kwargs = {
@@ -1086,3 +1087,63 @@ class TestEcsExecutorConfig:
         executor.start()
 
         ecs_mock.stop_task.assert_not_called()
+
+    def test_providing_both_capacity_provider_and_launch_type_fails(self, 
set_env_vars):
+        os.environ[
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CAPACITY_PROVIDER_STRATEGY}".upper()
+        ] = "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 
'cp2', 'weight': 1}]"
+        expected_error = (
+            "capacity_provider_strategy and launch_type are mutually 
exclusive, you can not provide both."
+        )
+
+        with pytest.raises(ValueError, match=expected_error):
+            AwsEcsExecutor()
+
+    def test_providing_capacity_provider(self, set_env_vars):
+        # If a capacity provider strategy is supplied without a launch type, 
use the strategy.
+
+        valid_capacity_provider = (
+            "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 
'cp2', 'weight': 1}]"
+        )
+
+        os.environ[
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CAPACITY_PROVIDER_STRATEGY}".upper()
+        ] = valid_capacity_provider
+        
os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper())
+
+        from airflow.providers.amazon.aws.executors.ecs import 
ecs_executor_config
+
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        assert "launchType" not in task_kwargs
+        assert task_kwargs["capacityProviderStrategy"] == 
valid_capacity_provider
+
+    @mock.patch.object(EcsHook, "conn")
+    def 
test_providing_no_capacity_provider_no_lunch_type_with_cluster_default(self, 
mock_conn, set_env_vars):
+        # If no capacity provider strategy is supplied and no launch type, but 
the
+        # cluster has a default capacity provider strategy, use the cluster's 
default.
+        mock_conn.describe_clusters.return_value = {
+            "clusters": [{"defaultCapacityProviderStrategy": 
["some_strategy"]}]
+        }
+        
os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper())
+
+        from airflow.providers.amazon.aws.executors.ecs import 
ecs_executor_config
+
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+        assert "launchType" not in task_kwargs
+        assert "capacityProviderStrategy" not in task_kwargs
+        assert mock_conn.describe_clusters.called_once()
+
+    @mock.patch.object(EcsHook, "conn")
+    def 
test_providing_no_capacity_provider_no_lunch_type_no_cluster_default(self, 
mock_conn, set_env_vars):
+        # If no capacity provider strategy is supplied and no launch type, and 
the cluster
+        # does not have a default capacity provider strategy, use the FARGATE 
launch type.
+
+        mock_conn.describe_clusters.return_value = {"clusters": [{"status": 
"ACTIVE"}]}
+
+        
os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper())
+
+        from airflow.providers.amazon.aws.executors.ecs import 
ecs_executor_config
+
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+        assert task_kwargs["launchType"] == "FARGATE"

Reply via email to