This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f4d2b5dee Add AWS ECS Executor (#34381)
5f4d2b5dee is described below

commit 5f4d2b5dee2e98a05124865c6fa855bc26d0af26
Author: Niko Oliveira <oniko...@amazon.com>
AuthorDate: Wed Oct 25 10:32:42 2023 -0700

    Add AWS ECS Executor (#34381)
    
    * Introduce AWS Executors to the Amazon provider package. The 
EcsFargateExecutor and BatchExecutor launches Airflow tasks on AWS ECS/Fargate 
service and the AWS Batch service, respectively.
    ---------
    
    Co-authored-by: Ahmed Elzeiny <ahmed.elze...@gmail.com>
    Co-authored-by: ferruzzi <ferru...@amazon.com>
    Co-authored-by: Syed Hussain <syeda...@amazon.com>
    Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
    Co-authored-by: Andrey Anshin <andrey.ans...@taragol.is>
---
 airflow/providers/amazon/aws/executors/__init__.py |  16 +
 .../providers/amazon/aws/executors/ecs/Dockerfile  | 107 +++
 .../providers/amazon/aws/executors/ecs/__init__.py | 335 ++++++++
 .../amazon/aws/executors/ecs/boto_schema.py        | 107 +++
 .../aws/executors/ecs/ecs_executor_config.py       | 108 +++
 .../providers/amazon/aws/executors/ecs/utils.py    | 264 ++++++
 airflow/providers/amazon/aws/hooks/base_aws.py     |  34 +-
 airflow/providers/amazon/provider.yaml             | 113 +++
 .../executors/ecs-executor.rst                     | 594 +++++++++++++
 .../executors/index.rst                            |  27 +
 docs/apache-airflow-providers-amazon/index.rst     |   1 +
 docs/spelling_wordlist.txt                         |   3 +
 generated/provider_dependencies.json               |   1 +
 tests/core/test_configuration.py                   |   1 +
 tests/providers/amazon/aws/executors/__init__.py   |  16 +
 .../providers/amazon/aws/executors/ecs/__init__.py |  16 +
 .../amazon/aws/executors/ecs/test_ecs_executor.py  | 919 +++++++++++++++++++++
 tests/providers/amazon/aws/hooks/test_base_aws.py  |  18 +-
 18 files changed, 2673 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/amazon/aws/executors/__init__.py 
b/airflow/providers/amazon/aws/executors/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/amazon/aws/executors/ecs/Dockerfile 
b/airflow/providers/amazon/aws/executors/ecs/Dockerfile
new file mode 100644
index 0000000000..f701ab6712
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/ecs/Dockerfile
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# hadolint ignore=DL3007
+FROM apache/airflow:latest
+USER root
+RUN apt-get update \
+  && apt-get install -y --no-install-recommends unzip \
+  # The below helps to keep the image size down
+  && apt-get clean \
+  && rm -rf /var/lib/apt/lists/*
+
+RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip"; -o 
"awscliv2.zip" && \
+    unzip awscliv2.zip \
+    && ./aws/install \
+    && rm -rf ./aws awscliv2.zip
+
+RUN curl 
"https://raw.githubusercontent.com/apache/airflow/main/scripts/docker/entrypoint_prod.sh";
 -o "/home/airflow/entrypoint.sh" \
+    && chmod +x /home/airflow/entrypoint.sh
+
+# Add a script to run the aws s3 sync command when the container is run
+COPY <<"EOF" /install_dags_entrypoint.sh
+#!/bin/bash
+
+echo "Downloading DAGs from S3 bucket"
+aws s3 sync "$S3_URL" "$CONTAINER_DAG_PATH"
+
+/home/airflow/entrypoint.sh "$@"
+EOF
+
+RUN chmod +x /install_dags_entrypoint.sh
+
+USER airflow
+
+## Installing Python Dependencies
+# Python dependencies can be installed by providing a requirements.txt.
+# If the file is in a different location, use the requirements_path build 
argument to specify
+# the file path.
+ARG requirements_path=./requirements.txt
+ENV REQUIREMENTS_PATH=$requirements_path
+
+# Uncomment the two lines below to copy the requirements.txt file to the 
container, and
+# install the dependencies.
+# COPY --chown=airflow:root $REQUIREMENTS_PATH /opt/airflow/requirements.txt
+# RUN pip install --no-cache-dir -r /opt/airflow/requirements.txt
+
+
+## AWS Authentication
+# The image requires access to AWS services. This Dockerfile supports 2 ways 
to authenticate with AWS.
+# The first is using build arguments where you can provide the AWS credentials 
as arguments
+# passed when building the image. The other option is to leverage the ECS 
execution role which is automatically
+# supported by the container running on ECS. Airflow will default to using 
Boto credential strategy which will
+# look for roles from ECS. See the ECS Executor Airflow documentation for more 
details.
+
+# If you would like to use an alternative method of authentication, feel free 
to make the
+# necessary changes to this file.
+
+# Use these arguments to provide AWS authentication information
+ARG aws_access_key_id
+ARG aws_secret_access_key
+ARG aws_default_region
+ARG aws_session_token
+
+ENV AWS_ACCESS_KEY_ID=$aws_access_key_id
+ENV AWS_SECRET_ACCESS_KEY=$aws_secret_access_key
+ENV AWS_DEFAULT_REGION=$aws_default_region
+ENV AWS_SESSION_TOKEN=$aws_session_token
+
+
+## Loading DAGs
+# This Dockerfile supports 2 ways to load DAGs onto the container.
+# One is to upload all the DAGs onto an S3 bucket, and then
+# download them onto the container. The other is to copy a local folder with
+# the DAGs onto the container.
+# If you would like to use an alternative method of loading DAGs, feel free to 
make the
+# necessary changes to this file.
+
+ARG host_dag_path=./dags
+ENV HOST_DAG_PATH=$host_dag_path
+ARG container_dag_path=/opt/airflow/dags
+ENV CONTAINER_DAG_PATH=$container_dag_path
+# Set host_dag_path to the path of the DAGs on the host
+# COPY --chown=airflow:root $HOST_DAG_PATH $CONTAINER_DAG_PATH
+
+
+# Use these arguments to load DAGs onto the container from S3
+ARG s3_url
+ENV S3_URL=$s3_url
+# If using S3 bucket as source of DAGs, uncommenting the next ENTRYPOINT 
command will overwrite this one.
+ENTRYPOINT ["/usr/bin/dumb-init", "--", "/home/airflow/entrypoint.sh"]
+
+# Uncomment the line if using S3 bucket as the source of DAGs
+# ENTRYPOINT ["/usr/bin/dumb-init", "--", "/install_dags_entrypoint.sh"]
diff --git a/airflow/providers/amazon/aws/executors/ecs/__init__.py 
b/airflow/providers/amazon/aws/executors/ecs/__init__.py
new file mode 100644
index 0000000000..e4d6575c79
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/ecs/__init__.py
@@ -0,0 +1,335 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor.
+
+Each Airflow task gets delegated out to an Amazon ECS Task.
+"""
+
+from __future__ import annotations
+
+import time
+from collections import defaultdict, deque
+from copy import deepcopy
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.executors.base_executor import BaseExecutor
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoDescribeTasksSchema, BotoRunTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_DEFAULTS,
+    CONFIG_GROUP_NAME,
+    AllEcsConfigKeys,
+    EcsExecutorException,
+    EcsQueuedTask,
+    EcsTaskCollection,
+)
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+    from airflow.providers.amazon.aws.executors.ecs.utils import (
+        CommandType,
+        ExecutorConfigType,
+    )
+
+
+class AwsEcsExecutor(BaseExecutor):
+    """
+    Executes the provided Airflow command on an ECS instance.
+
+    The Airflow Scheduler creates a shell command, and passes it to the 
executor. This ECS Executor
+    runs said Airflow command on a remote Amazon ECS Cluster with a 
task-definition configured to
+    launch the same containers as the Scheduler. It then periodically checks 
in with the launched
+    tasks (via task ARNs) to determine the status.
+
+    This allows individual tasks to specify CPU, memory, GPU, env variables, 
etc. When initializing a task,
+    there's an option for "executor config" which should be a dictionary with 
keys that match the
+    ``ContainerOverride`` definition per AWS documentation (see link below).
+
+    Prerequisite: proper configuration of Boto3 library
+    .. seealso:: 
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
 for
+    authentication and access-key management. You can store an environmental 
variable, setup aws config from
+    console, or use IAM roles.
+
+    .. seealso:: 
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
 for an
+     Airflow TaskInstance's executor_config.
+    """
+
+    # Maximum number of retries to run an ECS task.
+    MAX_RUN_TASK_ATTEMPTS = conf.get(
+        CONFIG_GROUP_NAME,
+        AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
+        fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS],
+    )
+
+    # AWS limits the maximum number of ARNs in the describe_tasks function.
+    DESCRIBE_TASKS_BATCH_SIZE = 99
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.active_workers: EcsTaskCollection = EcsTaskCollection()
+        self.pending_tasks: deque = deque()
+
+        self.cluster = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER)
+        self.container_name = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.CONTAINER_NAME)
+        aws_conn_id = conf.get(
+            CONFIG_GROUP_NAME,
+            AllEcsConfigKeys.AWS_CONN_ID,
+            fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.AWS_CONN_ID],
+        )
+        region_name = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.REGION_NAME)
+        from airflow.providers.amazon.aws.hooks.ecs import EcsHook
+
+        self.ecs = EcsHook(aws_conn_id=aws_conn_id, 
region_name=region_name).conn
+        self.run_task_kwargs = self._load_run_kwargs()
+
+    def sync(self):
+        try:
+            self.sync_running_tasks()
+            self.attempt_task_runs()
+        except Exception:
+            # We catch any and all exceptions because otherwise they would 
bubble
+            # up and kill the scheduler process
+            self.log.exception("Failed to sync %s", self.__class__.__name__)
+
+    def sync_running_tasks(self):
+        """Checks and update state on all running tasks."""
+        all_task_arns = self.active_workers.get_all_arns()
+        if not all_task_arns:
+            self.log.debug("No active tasks, skipping sync")
+            return
+
+        describe_tasks_response = self.__describe_tasks(all_task_arns)
+        self.log.debug("Active Workers: %s", describe_tasks_response)
+
+        if describe_tasks_response["failures"]:
+            for failure in describe_tasks_response["failures"]:
+                self.__handle_failed_task(failure["arn"], failure["reason"])
+
+        updated_tasks = describe_tasks_response["tasks"]
+        for task in updated_tasks:
+            self.__update_running_task(task)
+
+    def __update_running_task(self, task):
+        self.active_workers.update_task(task)
+        # Get state of current task.
+        task_state = task.get_task_state()
+        task_key = self.active_workers.arn_to_key[task.task_arn]
+        # Mark finished tasks as either a success/failure.
+        if task_state == State.FAILED:
+            self.fail(task_key)
+        elif task_state == State.SUCCESS:
+            self.success(task_key)
+        elif task_state == State.REMOVED:
+            self.__handle_failed_task(task.task_arn, task.stopped_reason)
+        if task_state in (State.FAILED, State.SUCCESS):
+            self.log.debug("Task %s marked as %s after running on %s", 
task_key, task_state, task.task_arn)
+            self.active_workers.pop_by_key(task_key)
+
+    def __describe_tasks(self, task_arns):
+        all_task_descriptions = {"tasks": [], "failures": []}
+        for i in range(0, len(task_arns), self.DESCRIBE_TASKS_BATCH_SIZE):
+            batched_task_arns = task_arns[i : i + 
self.DESCRIBE_TASKS_BATCH_SIZE]
+            if not batched_task_arns:
+                continue
+            boto_describe_tasks = 
self.ecs.describe_tasks(tasks=batched_task_arns, cluster=self.cluster)
+            describe_tasks_response = 
BotoDescribeTasksSchema().load(boto_describe_tasks)
+
+            
all_task_descriptions["tasks"].extend(describe_tasks_response["tasks"])
+            
all_task_descriptions["failures"].extend(describe_tasks_response["failures"])
+        return all_task_descriptions
+
+    def __handle_failed_task(self, task_arn: str, reason: str):
+        """If an API failure occurs, the task is rescheduled."""
+        task_key = self.active_workers.arn_to_key[task_arn]
+        task_info = self.active_workers.info_by_key(task_key)
+        task_cmd = task_info.cmd
+        queue = task_info.queue
+        exec_info = task_info.config
+        failure_count = self.active_workers.failure_count_by_key(task_key)
+        if int(failure_count) < int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
+            self.log.warning(
+                "Task %s has failed due to %s. Failure %s out of %s occurred 
on %s. Rescheduling.",
+                task_key,
+                reason,
+                failure_count,
+                self.__class__.MAX_RUN_TASK_ATTEMPTS,
+                task_arn,
+            )
+            self.active_workers.increment_failure_count(task_key)
+            self.pending_tasks.appendleft(
+                EcsQueuedTask(task_key, task_cmd, queue, exec_info, 
failure_count + 1)
+            )
+        else:
+            self.log.error(
+                "Task %s has failed a maximum of %s times. Marking as failed", 
task_key, failure_count
+            )
+            self.active_workers.pop_by_key(task_key)
+            self.fail(task_key)
+
+    def attempt_task_runs(self):
+        """
+        Takes tasks from the pending_tasks queue, and attempts to find an 
instance to run it on.
+
+        If the launch type is EC2, this will attempt to place tasks on empty 
EC2 instances.  If
+            there are no EC2 instances available, no task is placed and this 
function will be
+            called again in the next heart-beat.
+
+        If the launch type is FARGATE, this will run the tasks on new AWS 
Fargate instances.
+        """
+        queue_len = len(self.pending_tasks)
+        failure_reasons = defaultdict(int)
+        for _ in range(queue_len):
+            ecs_task = self.pending_tasks.popleft()
+            task_key = ecs_task.key
+            cmd = ecs_task.command
+            queue = ecs_task.queue
+            exec_config = ecs_task.executor_config
+            attempt_number = ecs_task.attempt_number
+            _failure_reasons = []
+            try:
+                run_task_response = self._run_task(task_key, cmd, queue, 
exec_config)
+            except Exception as e:
+                # Failed to even get a response back from the Boto3 API or 
something else went
+                # wrong.  For any possible failure we want to add the 
exception reasons to the
+                # failure list so that it is logged to the user and most 
importantly the task is
+                # added back to the pending list to be retried later.
+                _failure_reasons.append(str(e))
+            else:
+                # We got a response back, check if there were failures. If so, 
add them to the
+                # failures list so that it is logged to the user and most 
importantly the task
+                # is added back to the pending list to be retried later.
+                if run_task_response["failures"]:
+                    _failure_reasons.extend([f["reason"] for f in 
run_task_response["failures"]])
+
+            if _failure_reasons:
+                for reason in _failure_reasons:
+                    failure_reasons[reason] += 1
+                # Make sure the number of attempts does not exceed 
MAX_RUN_TASK_ATTEMPTS
+                if int(attempt_number) <= 
int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
+                    ecs_task.attempt_number += 1
+                    self.pending_tasks.appendleft(ecs_task)
+                else:
+                    self.log.error(
+                        "Task %s has failed a maximum of %s times. Marking as 
failed",
+                        task_key,
+                        attempt_number,
+                    )
+                    self.fail(task_key)
+            elif not run_task_response["tasks"]:
+                self.log.error("ECS RunTask Response: %s", run_task_response)
+                raise EcsExecutorException(
+                    "No failures and no tasks provided in response. This 
should never happen."
+                )
+            else:
+                task = run_task_response["tasks"][0]
+                self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
+        if failure_reasons:
+            self.log.error(
+                "Pending tasks failed to launch for the following reasons: %s. 
Will retry later.",
+                dict(failure_reasons),
+            )
+
+    def _run_task(
+        self, task_id: TaskInstanceKey, cmd: CommandType, queue: str, 
exec_config: ExecutorConfigType
+    ):
+        """
+        Run a queued-up Airflow task.
+
+        Not to be confused with execute_async() which inserts tasks into the 
queue.
+        The command and executor config will be placed in the 
container-override
+        section of the JSON request before calling Boto3's "run_task" function.
+        """
+        run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
+        boto_run_task = self.ecs.run_task(**run_task_api)
+        run_task_response = BotoRunTaskSchema().load(boto_run_task)
+        return run_task_response
+
+    def _run_task_kwargs(
+        self, task_id: TaskInstanceKey, cmd: CommandType, queue: str, 
exec_config: ExecutorConfigType
+    ) -> dict:
+        """
+        Overrides the Airflow command to update the container overrides so 
kwargs are specific to this task.
+
+        One last chance to modify Boto3's "run_task" kwarg params before it 
gets passed into the Boto3 client.
+        """
+        run_task_api = deepcopy(self.run_task_kwargs)
+        container_override = 
self.get_container(run_task_api["overrides"]["containerOverrides"])
+        container_override["command"] = cmd
+        container_override.update(exec_config)
+
+        # Inject the env variable to configure logging for containerized 
execution environment
+        if "environment" not in container_override:
+            container_override["environment"] = []
+        container_override["environment"].append({"name": 
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})
+
+        return run_task_api
+
+    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
+        """Save the task to be executed in the next sync by inserting the 
commands into a queue."""
+        if executor_config and ("name" in executor_config or "command" in 
executor_config):
+            raise ValueError('Executor Config should never override "name" or 
"command"')
+        self.pending_tasks.append(EcsQueuedTask(key, command, queue, 
executor_config or {}, 1))
+
+    def end(self, heartbeat_interval=10):
+        """Waits for all currently running tasks to end, and doesn't launch 
any tasks."""
+        try:
+            while True:
+                self.sync()
+                if not self.active_workers:
+                    break
+                time.sleep(heartbeat_interval)
+        except Exception:
+            # We catch any and all exceptions because otherwise they would 
bubble
+            # up and kill the scheduler process.
+            self.log.exception("Failed to end %s", self.__class__.__name__)
+
+    def terminate(self):
+        """Kill all ECS processes by calling Boto3's StopTask API."""
+        try:
+            for arn in self.active_workers.get_all_arns():
+                self.ecs.stop_task(
+                    cluster=self.cluster, task=arn, reason="Airflow Executor 
received a SIGTERM"
+                )
+            self.end()
+        except Exception:
+            # We catch any and all exceptions because otherwise they would 
bubble
+            # up and kill the scheduler process.
+            self.log.exception("Failed to terminate %s", 
self.__class__.__name__)
+
+    def _load_run_kwargs(self) -> dict:
+        from airflow.providers.amazon.aws.executors.ecs.ecs_executor_config 
import build_task_kwargs
+
+        ecs_executor_run_task_kwargs = build_task_kwargs()
+
+        try:
+            
self.get_container(ecs_executor_run_task_kwargs["overrides"]["containerOverrides"])["command"]
+        except KeyError:
+            raise KeyError(
+                "Rendered JSON template does not contain key "
+                '"overrides[containerOverrides][containers][x][command]"'
+            )
+        return ecs_executor_run_task_kwargs
+
+    def get_container(self, container_list):
+        """Searches task list for core Airflow container."""
+        for container in container_list:
+            if container["name"] == self.container_name:
+                return container
+        raise KeyError(f"No such container found by container name: 
{self.container_name}")
diff --git a/airflow/providers/amazon/aws/executors/ecs/boto_schema.py 
b/airflow/providers/amazon/aws/executors/ecs/boto_schema.py
new file mode 100644
index 0000000000..500887e5d5
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/ecs/boto_schema.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor Boto Schema.
+
+Schemas for easily and consistently parsing boto responses.
+"""
+
+from __future__ import annotations
+
+from marshmallow import EXCLUDE, Schema, fields, post_load
+
+
+class BotoContainerSchema(Schema):
+    """
+    Botocore Serialization Object for ECS ``Container`` shape.
+
+    Note that there are many more parameters, but the executor only needs the 
members listed below.
+    """
+
+    exit_code = fields.Integer(data_key="exitCode")
+    last_status = fields.String(data_key="lastStatus")
+    name = fields.String(required=True)
+
+    class Meta:
+        """Options object for a Schema. See Schema.Meta for more details and 
valid values."""
+
+        unknown = EXCLUDE
+
+
+class BotoTaskSchema(Schema):
+    """
+    Botocore Serialization Object for ECS ``Task`` shape.
+
+    Note that there are many more parameters, but the executor only needs the 
members listed below.
+    """
+
+    task_arn = fields.String(data_key="taskArn", required=True)
+    last_status = fields.String(data_key="lastStatus", required=True)
+    desired_status = fields.String(data_key="desiredStatus", required=True)
+    containers = fields.List(fields.Nested(BotoContainerSchema), required=True)
+    started_at = fields.Field(data_key="startedAt")
+    stopped_reason = fields.String(data_key="stoppedReason")
+
+    @post_load
+    def make_task(self, data, **kwargs):
+        """Overwrites marshmallow load() to return an instance of 
EcsExecutorTask instead of a dictionary."""
+        # Imported here to avoid circular import.
+        from airflow.providers.amazon.aws.executors.ecs.utils import 
EcsExecutorTask
+
+        return EcsExecutorTask(**data)
+
+    class Meta:
+        """Options object for a Schema. See Schema.Meta for more details and 
valid values."""
+
+        unknown = EXCLUDE
+
+
+class BotoFailureSchema(Schema):
+    """Botocore Serialization Object for ECS ``Failure`` Shape."""
+
+    arn = fields.String()
+    reason = fields.String()
+
+    class Meta:
+        """Options object for a Schema. See Schema.Meta for more details and 
valid values."""
+
+        unknown = EXCLUDE
+
+
+class BotoRunTaskSchema(Schema):
+    """Botocore Serialization Object for ECS ``RunTask`` Operation output."""
+
+    tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
+    failures = fields.List(fields.Nested(BotoFailureSchema), required=True)
+
+    class Meta:
+        """Options object for a Schema. See Schema.Meta for more details and 
valid values."""
+
+        unknown = EXCLUDE
+
+
+class BotoDescribeTasksSchema(Schema):
+    """Botocore Serialization Object for ECS ``DescribeTask`` Operation 
output."""
+
+    tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
+    failures = fields.List(fields.Nested(BotoFailureSchema), required=True)
+
+    class Meta:
+        """Options object for a Schema. See Schema.Meta for more details and 
valid values."""
+
+        unknown = EXCLUDE
diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py 
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py
new file mode 100644
index 0000000000..f0fa97852a
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor configuration.
+
+This is the configuration for calling the ECS ``run_task`` function. The AWS 
ECS Executor calls
+Boto3's ``run_task(**kwargs)`` function with the kwargs templated by this 
dictionary. See the URL
+below for documentation on the parameters accepted by the Boto3 run_task 
function.
+
+.. seealso::
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
+
+"""
+
+from __future__ import annotations
+
+import json
+from json import JSONDecodeError
+
+from airflow.configuration import conf
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_GROUP_NAME,
+    AllEcsConfigKeys,
+    RunTaskKwargsConfigKeys,
+    camelize_dict_keys,
+    parse_assign_public_ip,
+)
+from airflow.utils.helpers import prune_dict
+
+
+def _fetch_templated_kwargs() -> dict[str, str]:
+    run_task_kwargs_value = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.RUN_TASK_KWARGS, fallback=dict())
+    return json.loads(str(run_task_kwargs_value))
+
+
+def _fetch_config_values() -> dict[str, str]:
+    return prune_dict(
+        {key: conf.get(CONFIG_GROUP_NAME, key, fallback=None) for key in 
RunTaskKwargsConfigKeys()}
+    )
+
+
+def build_task_kwargs() -> dict:
+    # This will put some kwargs at the root of the dictionary that do NOT 
belong there. However,
+    # the code below expects them to be there and will rearrange them as 
necessary.
+    task_kwargs = _fetch_config_values()
+    task_kwargs.update(_fetch_templated_kwargs())
+
+    # There can only be 1 count of these containers
+    task_kwargs["count"] = 1  # type: ignore
+    # There could be a generic approach to the below, but likely more 
convoluted then just manually ensuring
+    # the one nested config we need to update is present. If we need to 
override more options in the future we
+    # should revisit this.
+    if "overrides" not in task_kwargs:
+        task_kwargs["overrides"] = {}  # type: ignore
+    if "containerOverrides" not in task_kwargs["overrides"]:
+        task_kwargs["overrides"]["containerOverrides"] = [{}]  # type: ignore
+    task_kwargs["overrides"]["containerOverrides"][0]["name"] = 
task_kwargs.pop(  # type: ignore
+        AllEcsConfigKeys.CONTAINER_NAME
+    )
+    # The executor will overwrite the 'command' property during execution. 
Must always be the first container!
+    task_kwargs["overrides"]["containerOverrides"][0]["command"] = []  # type: 
ignore
+
+    if any(
+        [
+            subnets := task_kwargs.pop(AllEcsConfigKeys.SUBNETS, None),
+            security_groups := 
task_kwargs.pop(AllEcsConfigKeys.SECURITY_GROUPS, None),
+            # Surrounding parens are for the walrus operator to function 
correctly along with the None check
+            (assign_public_ip := 
task_kwargs.pop(AllEcsConfigKeys.ASSIGN_PUBLIC_IP, None)) is not None,
+        ]
+    ):
+        network_config = prune_dict(
+            {
+                "awsvpcConfiguration": {
+                    "subnets": str(subnets).split(",") if subnets else None,
+                    "securityGroups": str(security_groups).split(",") if 
security_groups else None,
+                    "assignPublicIp": parse_assign_public_ip(assign_public_ip),
+                }
+            }
+        )
+
+        if "subnets" not in network_config["awsvpcConfiguration"]:
+            raise ValueError("At least one subnet is required to run a task.")
+
+        task_kwargs["networkConfiguration"] = network_config
+
+    task_kwargs = camelize_dict_keys(task_kwargs)
+
+    try:
+        json.loads(json.dumps(task_kwargs))
+    except JSONDecodeError:
+        raise ValueError("AWS ECS Executor config values must be JSON 
serializable.")
+
+    return task_kwargs
diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py 
b/airflow/providers/amazon/aws/executors/ecs/utils.py
new file mode 100644
index 0000000000..4b5c69cb68
--- /dev/null
+++ b/airflow/providers/amazon/aws/executors/ecs/utils.py
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor Utilities.
+
+Data classes and utility functions used by the ECS executor.
+"""
+
+from __future__ import annotations
+
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Callable, Dict, List
+
+from inflection import camelize
+
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+
+CommandType = List[str]
+ExecutorConfigFunctionType = Callable[[CommandType], dict]
+ExecutorConfigType = Dict[str, Any]
+
+CONFIG_GROUP_NAME = "aws_ecs_executor"
+
+CONFIG_DEFAULTS = {
+    "conn_id": "aws_default",
+    "max_run_task_attempts": "3",
+    "assign_public_ip": "False",
+    "launch_type": "FARGATE",
+    "platform_version": "LATEST",
+}
+
+
+@dataclass
+class EcsQueuedTask:
+    """Represents an ECS task that is queued. The task will be run in the next 
heartbeat."""
+
+    key: TaskInstanceKey
+    command: CommandType
+    queue: str
+    executor_config: ExecutorConfigType
+    attempt_number: int
+
+
+@dataclass
+class EcsTaskInfo:
+    """Contains information about a currently running ECS task."""
+
+    cmd: CommandType
+    queue: str
+    config: ExecutorConfigType
+
+
+class BaseConfigKeys:
+    """Base Implementation of the Config Keys class. Implements iteration for 
child classes to inherit."""
+
+    def __iter__(self):
+        return iter({value for (key, value) in self.__class__.__dict__.items() 
if not key.startswith("__")})
+
+
+class RunTaskKwargsConfigKeys(BaseConfigKeys):
+    """Keys loaded into the config which are valid ECS run_task kwargs."""
+
+    ASSIGN_PUBLIC_IP = "assign_public_ip"
+    CLUSTER = "cluster"
+    LAUNCH_TYPE = "launch_type"
+    PLATFORM_VERSION = "platform_version"
+    SECURITY_GROUPS = "security_groups"
+    SUBNETS = "subnets"
+    TASK_DEFINITION = "task_definition"
+    CONTAINER_NAME = "container_name"
+
+
+class AllEcsConfigKeys(RunTaskKwargsConfigKeys):
+    """All keys loaded into the config which are related to the ECS 
Executor."""
+
+    MAX_RUN_TASK_ATTEMPTS = "max_run_task_attempts"
+    AWS_CONN_ID = "conn_id"
+    RUN_TASK_KWARGS = "run_task_kwargs"
+    REGION_NAME = "region_name"
+
+
+class EcsExecutorException(Exception):
+    """Thrown when something unexpected has occurred within the ECS 
ecosystem."""
+
+
+class EcsExecutorTask:
+    """Data Transfer Object for an ECS Fargate Task."""
+
+    def __init__(
+        self,
+        task_arn: str,
+        last_status: str,
+        desired_status: str,
+        containers: list[dict[str, Any]],
+        started_at: Any | None = None,
+        stopped_reason: str | None = None,
+    ):
+        self.task_arn = task_arn
+        self.last_status = last_status
+        self.desired_status = desired_status
+        self.containers = containers
+        self.started_at = started_at
+        self.stopped_reason = stopped_reason
+
+    def get_task_state(self) -> str:
+        """
+        This is the primary logic that handles state in an ECS task.
+
+        It will determine if a status is:
+            QUEUED - Task is being provisioned.
+            RUNNING - Task is launched on ECS.
+            REMOVED - Task provisioning has failed for some reason. See 
`stopped_reason`.
+            FAILED - Task is completed and at least one container has failed.
+            SUCCESS - Task is completed and all containers have succeeded.
+        """
+        if self.last_status == "RUNNING":
+            return State.RUNNING
+        elif self.desired_status == "RUNNING":
+            return State.QUEUED
+        is_finished = self.desired_status == "STOPPED"
+        has_exit_codes = all(["exit_code" in x for x in self.containers])
+        # Sometimes ECS tasks may time out.
+        if not self.started_at and is_finished:
+            return State.REMOVED
+        if not is_finished or not has_exit_codes:
+            return State.RUNNING
+        all_containers_succeeded = all([x["exit_code"] == 0 for x in 
self.containers])
+        return State.SUCCESS if all_containers_succeeded else State.FAILED
+
+    def __repr__(self):
+        return f"({self.task_arn}, {self.last_status}->{self.desired_status}, 
{self.get_task_state()})"
+
+
+class EcsTaskCollection:
+    """A five-way dictionary between Airflow task ids, Airflow cmds, ECS ARNs, 
and ECS task objects."""
+
+    def __init__(self):
+        self.key_to_arn: dict[TaskInstanceKey, str] = {}
+        self.arn_to_key: dict[str, TaskInstanceKey] = {}
+        self.tasks: dict[str, EcsExecutorTask] = {}
+        self.key_to_failure_counts: dict[TaskInstanceKey, int] = 
defaultdict(int)
+        self.key_to_task_info: dict[TaskInstanceKey, EcsTaskInfo] = {}
+
+    def add_task(
+        self,
+        task: EcsExecutorTask,
+        airflow_task_key: TaskInstanceKey,
+        queue: str,
+        airflow_cmd: CommandType,
+        exec_config: ExecutorConfigType,
+        attempt_number: int,
+    ):
+        """Adds a task to the collection."""
+        arn = task.task_arn
+        self.tasks[arn] = task
+        self.key_to_arn[airflow_task_key] = arn
+        self.arn_to_key[arn] = airflow_task_key
+        self.key_to_task_info[airflow_task_key] = EcsTaskInfo(airflow_cmd, 
queue, exec_config)
+        self.key_to_failure_counts[airflow_task_key] = attempt_number
+
+    def update_task(self, task: EcsExecutorTask):
+        """Updates the state of the given task based on task ARN."""
+        self.tasks[task.task_arn] = task
+
+    def task_by_key(self, task_key: TaskInstanceKey) -> EcsExecutorTask:
+        """Get a task by Airflow Instance Key."""
+        arn = self.key_to_arn[task_key]
+        return self.task_by_arn(arn)
+
+    def task_by_arn(self, arn) -> EcsExecutorTask:
+        """Get a task by AWS ARN."""
+        return self.tasks[arn]
+
+    def pop_by_key(self, task_key: TaskInstanceKey) -> EcsExecutorTask:
+        """Deletes task from collection based off of Airflow Task Instance 
Key."""
+        arn = self.key_to_arn[task_key]
+        task = self.tasks[arn]
+        del self.key_to_arn[task_key]
+        del self.key_to_task_info[task_key]
+        del self.arn_to_key[arn]
+        del self.tasks[arn]
+        if task_key in self.key_to_failure_counts:
+            del self.key_to_failure_counts[task_key]
+        return task
+
+    def get_all_arns(self) -> list[str]:
+        """Get all AWS ARNs in collection."""
+        return list(self.key_to_arn.values())
+
+    def get_all_task_keys(self) -> list[TaskInstanceKey]:
+        """Get all Airflow Task Keys in collection."""
+        return list(self.key_to_arn.keys())
+
+    def failure_count_by_key(self, task_key: TaskInstanceKey) -> int:
+        """Get the number of times a task has failed given an Airflow Task 
Key."""
+        return self.key_to_failure_counts[task_key]
+
+    def increment_failure_count(self, task_key: TaskInstanceKey):
+        """Increment the failure counter given an Airflow Task Key."""
+        self.key_to_failure_counts[task_key] += 1
+
+    def info_by_key(self, task_key: TaskInstanceKey) -> EcsTaskInfo:
+        """Get the Airflow Command given an Airflow task key."""
+        return self.key_to_task_info[task_key]
+
+    def __getitem__(self, value):
+        """Gets a task by AWS ARN."""
+        return self.task_by_arn(value)
+
+    def __len__(self):
+        """Determines the number of tasks in collection."""
+        return len(self.tasks)
+
+
+def _recursive_flatten_dict(nested_dict):
+    """
+    Recursively unpack a nested dict and return it as a flat dict.
+
+    For example, _flatten_dict({'a': 'a', 'b': 'b', 'c': {'d': 'd'}}) returns 
{'a': 'a', 'b': 'b', 'd': 'd'}.
+    """
+    items = []
+    for key, value in nested_dict.items():
+        if isinstance(value, dict):
+            items.extend(_recursive_flatten_dict(value).items())
+        else:
+            items.append((key, value))
+    return dict(items)
+
+
+def parse_assign_public_ip(assign_public_ip):
+    """Convert "assign_public_ip" from True/False to ENABLE/DISABLE."""
+    return "ENABLED" if assign_public_ip == "True" else "DISABLED"
+
+
+def camelize_dict_keys(nested_dict) -> dict:
+    """Accept a potentially nested dictionary and recursively convert all keys 
into camelCase."""
+    result = {}
+    for key, value in nested_dict.items():
+        new_key = camelize(key, uppercase_first_letter=False)
+        if isinstance(value, dict) and (key.lower() != "tags"):
+            # The key name on tags can be whatever the user wants, and we 
should not mess with them.
+            result[new_key] = camelize_dict_keys(value)
+        else:
+            result[new_key] = nested_dict[key]
+    return result
diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py 
b/airflow/providers/amazon/aws/hooks/base_aws.py
index 00e8db657f..d608a87b8c 100644
--- a/airflow/providers/amazon/aws/hooks/base_aws.py
+++ b/airflow/providers/amazon/aws/hooks/base_aws.py
@@ -490,7 +490,7 @@ class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]):
         return manager.providers[hook.package_name].version
 
     @staticmethod
-    def _find_class_name(target_function_name: str) -> str:
+    def _find_operator_class_name(target_function_name: str) -> str | None:
         """Given a frame off the stack, return the name of the class that made 
the call.
 
         This method may raise a ValueError or an IndexError. The caller is
@@ -499,7 +499,11 @@ class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]):
         stack = inspect.stack()
         # Find the index of the most recent frame which called the provided 
function name
         # and pull that frame off the stack.
-        target_frame = next(frame for frame in stack if frame.function == 
target_function_name)[0]
+        target_frames = [frame for frame in stack if frame.function == 
target_function_name]
+        if target_frames:
+            target_frame = target_frames[0][0]
+        else:
+            return None
         # Get the local variables for that frame.
         frame_variables = target_frame.f_locals["self"]
         # Get the class object for that frame.
@@ -507,14 +511,32 @@ class AwsGenericHook(BaseHook, 
Generic[BaseAwsConnection]):
         # Return the name of the class object.
         return frame_class_object.__name__
 
+    @staticmethod
+    def _find_executor_class_name() -> str | None:
+        """Inspect the call stack looking for any executor classes and 
returning the first found."""
+        stack = inspect.stack()
+        # Fetch class objects on all frames, looking for one containing an 
executor (since it
+        # will inherit from BaseExecutor)
+        for frame in stack:
+            classes = []
+            for name, obj in frame[0].f_globals.items():
+                if inspect.isclass(obj):
+                    classes.append(name)
+            if "BaseExecutor" in classes:
+                return classes[-1]
+        return None
+
     @return_on_error("Unknown")
     def _get_caller(self, target_function_name: str = "execute") -> str:
-        """Given a function name, walk the stack and return the name of the 
class which called it last."""
-        caller = self._find_class_name(target_function_name)
+        """Try to determine the caller of this hook. Whether that be an AWS 
Operator, Sensor or Executor."""
+        caller = self._find_operator_class_name(target_function_name)
         if caller == "BaseSensorOperator":
             # If the result is a BaseSensorOperator, then look for whatever 
last called "poke".
-            return self._get_caller("poke")
-        return caller
+            caller = self._find_operator_class_name("poke")
+        if not caller:
+            # Check if we can find an executor
+            caller = self._find_executor_class_name()
+        return caller if caller else "Unknown"
 
     @staticmethod
     @return_on_error("00000000-0000-0000-0000-000000000000")
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 7cbd403fc6..77c9482c71 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -83,6 +83,7 @@ dependencies:
   # NOTE!!! BOTOCORE version is always shifted by 3 MINOR VERSIONS from boto3
   # See https://github.com/boto/boto3/issues/2702
   - botocore>=1.31.0
+  - inflection>=0.5.1
   - watchtower~=3.0.1
   - jsonpath_ng>=1.5.3
   - redshift_connector>=2.0.888
@@ -763,3 +764,115 @@ config:
         version_added: 8.7.2
         example: 
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize
         default: 
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
+  aws_ecs_executor:
+    description: |
+      This section only applies if you are using the AwsEcsExecutor in
+      Airflow's ``[core]`` configuration.
+      For more information on any of these execution parameters, see the link 
below:
+      
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/run_task.html
+      For boto3 credential management, see
+      
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
+    options:
+      conn_id:
+        description: |
+          The Airflow connection (i.e. credentials) used by the ECS executor 
to make API calls to AWS ECS.
+        version_added: "8.10"
+        type: string
+        example: "aws_default"
+        default: "aws_default"
+      region_name:
+        description: |
+          The name of the AWS Region where Amazon ECS is configured. Required.
+        version_added: "8.10"
+        type: string
+        example: "us-east-1"
+        default: ~
+      assign_public_ip:
+        description: |
+          Whether to assign a public IP address to the containers launched by 
the ECS executor.
+          For more info see url to Boto3 docs above.
+        version_added: "8.10"
+        type: boolean
+        example: "True"
+        default: "False"
+      cluster:
+        description: |
+          Name of the Amazon ECS Cluster. Required.
+        version_added: "8.10"
+        type: string
+        example: "ecs_executor_cluster"
+        default: ~
+      container_name:
+        description: |
+          Name of the container that will be used to execute Airflow tasks via 
the ECS executor.
+          The container should be specified in the ECS Task Definition and 
will receive an airflow
+          CLI command as an additional parameter to its entrypoint. For more 
info see url to Boto3
+          docs above. Required.
+        version_added: "8.10"
+        type: string
+        example: "ecs_executor_container"
+        default: ~
+      launch_type:
+        description: |
+          Launch type can either be 'FARGATE' OR 'EC2'. For more info see url 
to
+          Boto3 docs above.
+
+          If the launch type is EC2, the executor will attempt to place tasks 
on
+          empty EC2 instances. If there are no EC2 instances available, no task
+          is placed and this function will be called again in the next 
heart-beat.
+
+          If the launch type is FARGATE, this will run the tasks on new AWS 
Fargate
+          instances.
+        version_added: "8.10"
+        type: string
+        example: "FARGATE"
+        default: "FARGATE"
+      platform_version:
+        description: |
+          The platform version the task uses. A platform version is only 
specified
+          for tasks hosted on Fargate. If one isn't specified, the LATEST 
platform
+          version is used.
+        version_added: "8.10"
+        type: string
+        example: "1.4.0"
+        default: "LATEST"
+      security_groups:
+        description: |
+          The comma-separated IDs of the security groups associated with the 
task. If you
+          don't specify a security group, the default security group for the 
VPC is used.
+          There's a limit of 5 security groups. For more info see url to Boto3 
docs above.
+        version_added: "8.10"
+        type: string
+        example: "sg-XXXX,sg-YYYY"
+        default: ~
+      subnets:
+        description: |
+          The comma-separated IDs of the subnets associated with the task or 
service.
+          There's a limit of 16 subnets. For more info see url to Boto3 docs 
above.
+        version_added: "8.10"
+        type: string
+        example: "subnet-XXXXXXXX,subnet-YYYYYYYY"
+        default: ~
+      task_definition:
+        description: |
+          The family and revision (family:revision) or full ARN of the task 
definition
+          to run. If a revision isn't specified, the latest ACTIVE revision is 
used.
+          For more info see url to Boto3 docs above.
+        version_added: "8.10"
+        type: string
+        example: executor_task_definition:LATEST
+        default: ~
+      max_run_task_attempts:
+        description: |
+          The maximum number of times the ECS Executor should attempt to run a 
task.
+        version_added: "8.10"
+        type: integer
+        example: "3"
+        default: "3"
+      run_task_kwargs:
+        description: |
+          A JSON string containing arguments to provide the ECS `run_task` API 
(see url above).
+        version_added: "8.10"
+        type: string
+        example: '{"tags": {"key": "schema", "value": "1.0"}}'
+        default: ~
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst 
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
new file mode 100644
index 0000000000..b09e6afff0
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -0,0 +1,594 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. warning::
+   The ECS Executor is alpha/experimental at the moment and may be subject to 
change without warning.
+
+
+================
+AWS ECS Executor
+================
+
+This is an Airflow executor powered by Amazon Elastic Container Service
+(ECS). Each task that Airflow schedules for execution is run within its
+own ECS container. Some benefits of an executor like this include:
+
+1. Task isolation: No task can be a noisy neighbor for another.
+   Resources like CPU, memory and disk are isolated to each individual
+   task. Any actions or failures which affect networking or fail the
+   entire container only affect the single task running in it. No single
+   user can overload the environment by triggering too many tasks,
+   because there are no shared workers.
+2. Customized environments: You can build different container images
+   which incorporate specific dependencies (such as system level
+   dependencies), binaries, or data required for a task to run.
+3. Cost effective: Compute resources only exist for the lifetime of the
+   Airflow task itself. This saves costs by not requiring
+   persistent/long lived workers ready at all times, which also need
+   maintenance and patching.
+
+For a quick start guide please see :ref:`here <setup_guide>`, it will
+get you up and running with a basic configuration.
+
+The below sections provide more generic details about configuration, the
+provided example Dockerfile and logging.
+
+.. _config-options:
+
+Config Options
+--------------
+
+There are a number of configuration options available, which can either
+be set directly in the airflow.cfg file under an "aws_ecs_executor"
+section or via environment variables using the
+``AIRFLOW__AWS_ECS_EXECUTOR__<OPTION_NAME>`` format, for example
+``AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME = "myEcsContainer"``. For
+more information on how to set these options, see `Setting Configuration
+Options 
<https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html>`__
+
+.. note::
+   Configuration options must be consistent across all the hosts/environments 
running the Airflow components (Scheduler, Webserver, ECS Task containers, 
etc). See `here 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html>`__
 for more details on setting configurations.
+
+In the case of conflicts, the order of precedence from lowest to highest is:
+
+1. Load default values for options which have defaults.
+2. Load any values explicitly provided through airflow.cfg or
+   environment variables. These are checked with Airflow's config
+   precedence.
+3. Load any values provided in the RUN_TASK_KWARGS option if one is
+   provided.
+
+Required config options:
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+-  CLUSTER - Name of the Amazon ECS Cluster. Required.
+-  CONTAINER_NAME - Name of the container that will be used to execute
+   Airflow tasks via the ECS executor. The container should be specified
+   in the ECS Task Definition. Required.
+-  REGION - The name of the AWS Region where Amazon ECS is configured.
+   Required.
+
+Optional config options:
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+-  ASSIGN_PUBLIC_IP - Whether to assign a public IP address to the
+   containers launched by the ECS executor. Defaults to "False".
+-  CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
+   executor to make API calls to AWS ECS. Defaults to "aws_default".
+-  LAUNCH_TYPE - Launch type can either be 'FARGATE' OR 'EC2'. Defaults
+   to "FARGATE".
+-  PLATFORM_VERSION - The platform version the ECS task uses if the
+   FARGATE launch type is used. Defaults to "LATEST".
+-  RUN_TASK_KWARGS - A JSON string containing arguments to provide the
+   ECS ``run_task`` API.
+-  SECURITY_GROUPS - Up to 5 comma-separated security group IDs
+   associated with the ECS task. Defaults to the VPC default.
+-  SUBNETS - Up to 16 comma-separated subnet IDs associated with the ECS
+   task or service. Defaults to the VPC default.
+-  TASK_DEFINITION - The family and revision (family:revision) or full
+   ARN of the ECS task definition to run. Defaults to the latest ACTIVE
+   revision.
+-  MAX_RUN_TASK_ATTEMPTS - The maximum number of times the Ecs Executor
+   should attempt to run a task. This refers to instances where the task
+   fails to start (i.e. ECS API failures, container failures etc.)
+
+For a more detailed description of available options, including type
+hints and examples, see the ``config_templates`` folder in the Amazon
+provider package.
+
+.. _dockerfile_for_ecs_executor:
+
+Dockerfile for ECS Executor
+---------------------------
+
+An example Dockerfile can be found `here 
<https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/executors/ecs/Dockerfile>`__,
 it creates an
+image that can be used on an ECS container to run Airflow tasks using
+the AWS ECS Executor in Apache Airflow. The image supports AWS CLI/API
+integration, allowing you to interact with AWS services within your
+Airflow environment. It also includes options to load DAGs (Directed
+Acyclic Graphs) from either an S3 bucket or a local folder.
+
+Base Image
+~~~~~~~~~~
+
+The Docker image is built upon the ``apache/airflow:latest`` image. See
+`here <https://hub.docker.com/r/apache/airflow>`__ for more information
+about the image.
+
+Important note: The Airflow and python versions in this image must align
+with the Airflow and python versions on the host/container which is
+running the Airflow scheduler process (which in turn runs the executor).
+The Airflow version of the image can be verified by running the
+container locally with the following command:
+
+.. code-block:: bash
+
+   docker run <image_name> version
+
+Similarly, the python version of the image can be verified the following
+command:
+
+.. code-block:: bash
+
+   docker run <image_name> python --version
+
+Ensure that these versions match the versions on the host/container
+which is running the Airflow scheduler process (and thus, the ECS
+executor.) Apache Airflow images with specific python versions can be
+downloaded from the Dockerhub registry, and filtering tags by the
+`python
+version <https://hub.docker.com/r/apache/airflow/tags?page=1&name=3.8>`__.
+For example, the tag ``latest-python3.8`` specifies that the image will
+have python 3.8 installed.
+
+Prerequisites
+~~~~~~~~~~~~~
+
+Docker must be installed on your system. Instructions for installing
+Docker can be found `here <https://docs.docker.com/get-docker/>`__.
+
+AWS Credentials
+~~~~~~~~~~~~~~~
+
+The `AWS CLI <https://aws.amazon.com/cli/>`__ is installed within the
+container, and there are multiple ways to pass AWS authentication
+information to the container. This guide will cover 2 methods.
+
+The most secure method is to use IAM roles. When creating an ECS Task
+Definition, you are able to select a Task Role and a Task Execution
+Role. The Task Execution Role is the role that is used by the container
+agent to make AWS API requests on your behalf. For the purposes of the
+ECS Executor, this role needs to have at least the
+``AmazonECSTaskExecutionRolePolicy`` as well as the
+``CloudWatchLogsFullAccess`` (or ``CloudWatchLogsFullAccessV2``) policies. The 
Task Role is the role that is
+used by the containers to make AWS API requests. This role needs to have
+permissions based on the tasks that are described in the DAG being run.
+If you are loading DAGs via an S3 bucket, this role needs to have
+permission to read the S3 bucket.
+
+To create a new Task Role or Task Execution Role, follow the steps
+below:
+
+1. Navigate to the IAM page on the AWS console, and from the left hand
+   tab, under Access Management, select Roles.
+2. On the Roles page, click Create role on the top right hand corner.
+3. Under Trusted entity type, select AWS Service.
+4. Select Elastic Container Service from the drop down under Use case,
+   and Elastic Container Service Task as the specific use case. Click
+   Next.
+5. In the Permissions page, select the permissions the role will need,
+   depending on whether the role is a Task Role or a Task Execution
+   Role. Click Next after selecting all the required permissions.
+6. Enter a name for the new role, and an optional description. Review
+   the Trusted Entities, and the permissions for the role. Add any tags
+   as necessary, and click Create role.
+
+When creating the Task Definition for the ECS cluster (see the :ref:`setup 
guide <setup_guide>` for more details), select the appropriate
+newly created Task Role and Task Execution role for the Task Definition.
+
+The second method is to use the build-time arguments
+(``aws_access_key_id``, ``aws_secret_access_key``,
+``aws_default_region``, and ``aws_session_token``).
+
+Note: This method is not recommended for use in production environments,
+because user credentials are stored in the container, which may be a
+security vulnerability.
+
+To pass AWS authentication information using these arguments, use the
+``--build-arg`` option during the Docker build process. For example:
+
+.. code-block:: bash
+
+   docker build -t my-airflow-image \
+    --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
+    --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
+    --build-arg aws_default_region=YOUR_DEFAULT_REGION \
+    --build-arg aws_session_token=YOUR_SESSION_TOKEN .
+
+Replace ``YOUR_ACCESS_KEY``, ``YOUR_SECRET_KEY``,
+``YOUR_SESSION_TOKEN``, and ``YOUR_DEFAULT_REGION`` with valid AWS
+credentials.
+
+Alternatively, you can authenticate to AWS using the ``~/.aws`` folder.
+See instructions on how to generate this folder
+`here 
<https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html>`__.
+Uncomment the line in the Dockerfile to copy the ``./.aws`` folder from
+your host machine to the container's ``/home/airflow/.aws`` directory.
+Keep in mind the Docker build context when copying the ``.aws`` folder
+to the container.
+
+Loading DAGs
+~~~~~~~~~~~~
+
+There are many ways to load DAGs on the ECS container. This Dockerfile
+is preconfigured with two possible ways: copying from a local folder, or
+downloading from an S3 bucket. Other methods of loading DAGs are
+possible as well.
+
+From S3 Bucket
+^^^^^^^^^^^^^^
+
+To load DAGs from an S3 bucket, uncomment the entrypoint line in the
+Dockerfile to synchronize the DAGs from the specified S3 bucket to the
+``/opt/airflow/dags`` directory inside the container. You can optionally
+provide ``container_dag_path`` as a build argument if you want to store
+the DAGs in a directory other than ``/opt/airflow/dags``.
+
+Add ``--build-arg s3_url=YOUR_S3_URL`` in the docker build command.
+Replace ``YOUR_S3_URL`` with the URL of your S3 bucket. Make sure you
+have the appropriate permissions to read from the bucket.
+
+Note that the following command is also passing in AWS credentials as
+build arguments.
+
+.. code-block:: bash
+
+   docker build -t my-airflow-image \
+    --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
+    --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
+    --build-arg aws_default_region=YOUR_DEFAULT_REGION \
+    --build-arg aws_session_token=YOUR_SESSION_TOKEN \
+    --build-arg s3_url=YOUR_S3_URL .
+
+From Local Folder
+^^^^^^^^^^^^^^^^^
+
+To load DAGs from a local folder, place your DAG files in a folder
+within the docker build context on your host machine, and provide the
+location of the folder using the ``host_dag_path`` build argument. By
+default, the DAGs will be copied to ``/opt/airflow/dags``, but this can
+be changed by passing the ``container_dag_path`` build-time argument
+during the Docker build process:
+
+.. code-block:: bash
+
+   docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host 
--build-arg container_dag_path=/path/on/container .
+
+If choosing to load DAGs onto a different path than
+``/opt/airflow/dags``, then the new path will need to be updated in the
+Airflow config.
+
+Installing Python Dependencies
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This Dockerfile supports installing Python dependencies via ``pip`` from
+a ``requirements.txt`` file. Place your ``requirements.txt`` file in the
+same directory as the Dockerfile. If it is in a different location, it
+can be specified using the ``requirements_path`` build-argument. Keep in
+mind the Docker context when copying the ``requirements.txt`` file.
+Uncomment the two appropriate lines in the Dockerfile that copy the
+``requirements.txt`` file to the container, and run ``pip install`` to
+install the dependencies on the container.
+
+Building Image for ECS Executor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Detailed instructions on how to use the Docker image, that you have
+created via this readme, with the ECS Executor can be found
+:ref:`here <setup_guide>`.
+
+.. _logging:
+
+Logging
+-------
+
+Airflow tasks executed via this executor run in ECS containers within
+the configured VPC. This means that logs are not directly accessible to
+the Airflow Webserver and when containers are stopped, after task
+completion, the logs would be permanently lost.
+
+Remote logging should be employed when using the ECS executor to persist
+your Airflow Task logs and make them viewable from the Airflow
+Webserver.
+
+Configuring Remote Logging
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+There are many ways to configure remote logging and several supported
+destinations. A general overview of Airflow Task logging can be found
+`here 
<https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/logging-tasks.html>`__.
+Instructions for configuring S3 remote logging can be found
+`here 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html>`__
+and Cloudwatch remote logging
+`here 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/cloud-watch-task-handlers.html>`__.
+Some important things to point out for remote logging in the context of
+the ECS executor:
+
+-  The configuration options for Airflow remote logging should be
+   configured on all hosts and containers running Airflow. For example
+   the Webserver requires this config so that it can fetch logs from
+   the remote location and the ECS container requires the config so that
+   it can upload the logs to the remote location. See
+   `here 
<https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html>`__
+   to read more about how to set Airflow configuration via config file
+   or environment variable exports.
+-  Adding the Airflow remote logging config to the container can be done
+   in many ways. Some examples include, but are not limited to:
+
+   -  Exported as environment variables directly in the Dockerfile (see
+      the Dockerfile section :ref:`above <dockerfile_for_ecs_executor>`)
+   -  Updating the ``airflow.cfg`` file or copy/mounting/downloading a
+      custom ``airflow.cfg`` in the Dockerfile.
+   -  Added in the ECS Task Definition in plain text or via
+      `Secrets/System
+      Manager 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/secrets-envvar.html>`__
+   -  Or, using `ECS Task Environment
+      Files 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/taskdef-envfiles.html>`__
+
+-  You must have credentials configured within the container to be able
+   to interact with the remote service for your logs (e.g. S3,
+   CloudWatch Logs, etc). This can be done in many ways. Some examples
+   include, but are not limited to:
+
+   -  Export credentials into the Dockerfile directly (see the
+      Dockerfile section :ref:`above <dockerfile_for_ecs_executor>`)
+   -  Configure an Airflow Connection and provide this as the `remote
+      logging conn
+      id 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#remote-log-conn-id>`__
+      (exported into the container by any of the means listed above or
+      your preferred method). Airflow will then use these credentials
+      *specifically* for interacting with your chosen remote logging
+      destination.
+
+.. note::
+   Configuration options must be consistent across all the hosts/environments 
running the Airflow components (Scheduler, Webserver, ECS Task containers, 
etc). See `here 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html>`__
 for more details on setting configurations.
+
+ECS Task Logging
+~~~~~~~~~~~~~~~~
+
+ECS can be configured to use the awslogs log driver to send log
+information to CloudWatch Logs for the ECS Tasks themselves. These logs
+will include the Airflow Task Operator logging and any other logging
+that occurs throughout the life of the process running in the container
+(in this case the Airflow CLI command ``airflow tasks run ...``). This
+can be helpful for debugging issues with remote logging or while testing
+remote logging configuration. Information on enabling this logging can
+be found
+`here 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html>`__.
+
+**Note: These logs will NOT be viewable from the Airflow Webserver UI.**
+
+Performance and Tuning
+~~~~~~~~~~~~~~~~~~~~~~
+
+While the ECS executor adds about 50-60 seconds of latency to each
+Airflow task execution, due to container startup time, it allows for a
+higher degree of parallelism and isolation. We have tested this executor
+with over 1,000 tasks scheduled in parallel and observed that up to 500
+tasks could be run in parallel simultaneously. The limit of 500 tasks is
+in accordance with `ECS Service
+Quotas 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-quotas.html>`__.
+
+When running this executor, and Airflow generally, at a large scale
+there are some configuration options to take into consideration. Many of
+the below configurations will either limit how many tasks can run
+concurrently or the performance of the scheduler.
+
+-  `core.max_active_tasks_per_dag 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag>`__
+-  `core.max_active_runs_per_dag 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-runs-per-dag>`__
+-  `core.parallelism 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parallelism>`__
+-  `scheduler.max_tis_per_query 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-tis-per-query>`__
+-  `default_pool_task_slot_count 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#default-pool-task-slot-count>`__
+-  `scheduler_health_check_threshold 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#scheduler-health-check-threshold>`__
+
+.. _setup_guide:
+
+
+Setting up an ECS Executor for Apache Airflow
+---------------------------------------------
+
+There are 3 steps involved in getting an ECS Executor to work in Apache 
Airflow:
+
+1. Creating a database that Airflow and the tasks running in ECS can connect 
to.
+
+2. Creating and configuring an ECS Cluster that can run tasks from Airflow.
+
+3. Configuring Airflow to use the ECS Executor and the database.
+
+There are different options for selecting a database backend. See `here 
<https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html>`_
 for more information about the different options supported by Airflow. The 
following guide will explain how to set up a PostgreSQL RDS Instance on AWS. 
The guide will also cover setting up an ECS cluster. The ECS Executor supports 
various launch types, but this guide will explain how to set up an ECS Fargate 
cluster.
+
+
+Setting up an RDS DB Instance for ECS Executors
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Create the RDS DB Instance
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+1. Log in to your AWS Management Console and navigate to the RDS service.
+
+2. Click "Create database" to start creating a new RDS instance.
+
+3. Choose the "Standard create" option, and select PostreSQL.
+
+4. Select the appropriate template, availability and durability.
+
+   - NOTE: At the time of this writing, the "Multi-AZ DB **Cluster**" option 
does not support setting the database name, which is a required step below.
+5. Set the DB Instance name, the username and password.
+
+6. Choose the instance configuration, and storage parameters.
+
+7. In the Connectivity section, select Don't connect to an EC2 compute resource
+
+8. Select or create a VPC and subnet, and allow public access to the DB. 
Select or create security group and select the Availability Zone.
+
+9.  Open the Additional Configuration tab and set the database name to 
``airflow_db``.
+
+10. Select other settings as required, and create the database by clicking 
Create database.
+
+
+Test Connectivity
+~~~~~~~~~~~~~~~~~
+
+In order to be able to connect to the new RDS instance, you need to allow 
inbound traffic to the database from your IP address.
+
+
+1. Under the "Security" heading in the "Connectivity & security" tab of the 
RDS instance, find the link to the VPC security group for your new RDS DB 
instance.
+
+2. Create an inbound rule that allows traffic from your IP address(es) on TCP 
port 5432 (PostgreSQL).
+
+3. Confirm that you can connect to the DB after modifying the security group. 
This will require having ``psql`` installed. Instructions for installing 
``psql`` can be found `here <https://www.postgresql.org/download/>`__.
+
+**NOTE**: Be sure that the status of your DB is Available before testing 
connectivity
+
+.. code-block:: bash
+
+   psql -h <endpoint> -p 5432 -U <username> <db_name>
+
+The endpoint can be found on the "Connectivity and Security" tab, the username 
(and password) are the credentials used when creating the database.
+
+The db_name should be ``airflow_db`` (unless a different one was used when 
creating the database.)
+
+You will be prompted to enter the password if the connection is successful.
+
+
+Creating an ECS Cluster with Fargate, and Task Definitions
+----------------------------------------------------------
+
+In order to create a Task Definition for the ECS Cluster that will work with 
Apache Airflow, you will need a Docker image that is properly configured. See 
the :ref:`Dockerfile <dockerfile_for_ecs_executor>` section for instructions on 
how to do that.
+
+Once the image is built, it needs to be put in a repository where it can be 
pulled by ECS. There are multiple ways to accomplish this. This guide will go 
over doing this using Amazon Elastic Container Registry (ECR).
+
+Create an ECR Repository
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+1. Log in to your AWS Management Console and navigate to the ECR service.
+
+2. Click Create repository.
+
+3. Name the repository and fill out other information as required.
+
+4. Click Create Repository.
+
+5. Once the repository has been created, click on the repository. Click on the 
"View push commands" button on the top right.
+
+6. Follow the instructions to push the Docker image, replacing image names as 
appropriate. Ensure the image is uploaded by refreshing the page once the image 
is pushed.
+
+Create ECS Cluster
+~~~~~~~~~~~~~~~~~~
+
+1. Log in to your AWS Management Console and navigate to the Amazon Elastic 
Container Service.
+
+2. Click "Clusters" then click "Create Cluster".
+
+3. Make sure that AWS Fargate (Serverless) is selected under Infrastructure.
+
+4. Select other options as required and click Create to create the cluster.
+
+Create Task Definition
+~~~~~~~~~~~~~~~~~~~~~~
+
+1. Click on Task Definitions on the left hand bar, and click Create new task 
definition.
+
+2. Choose the Task Definition Family name. Select AWS Fargate for the Launch 
Type.
+
+3. Select or create the Task Role and Task Execution Role, and ensure the 
roles have the required permissions to accomplish their respective tasks. You 
can choose to create a new Task Execution role that will have the basic minimum 
permissions in order for the task to run.
+
+4. Select a name for the container, and use the image URI of the image that 
was pushed in the previous section. Make sure the role being used has the 
required permissions to pull the image.
+
+5. Add the following environment variables to the container:
+
+- ``AIRFLOW__DATABASE__SQL_ALCHEMY_CONN``, with the value being the PostgreSQL 
connection string in the following format using the values set during the 
`Database section <#create-the-rds-db-instance>`_ above:
+
+.. code-block:: bash
+
+   postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
+
+
+- ``AIRFLOW__ECS_EXECUTOR__SECURITY_GROUPS``, with the value being a comma 
separated list of security group IDs associated with the VPC used for the RDS 
instance.
+
+- ``AIRFLOW__ECS_EXECUTOR__SUBNETS``, with the value being a comma separated 
list of subnet IDs of the subnets associated with the RDS instance.
+
+1. Add other configuration as necessary for Airflow generally (see `here 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html>`__),
 the ECS executor (see :ref:`here <config-options>`) or for remote logging (see 
:ref:`here <logging>`). Note that any configuration changes should be made 
across the entire Airflow environment to keep configuration consistent.
+
+2. Click Create.
+
+Allow ECS Containers to Access RDS Database
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+As a final step, access to the database must be configured for the ECS 
containers. Many different networking configurations are possible, but one 
possible approach is:
+
+1. Log in to your AWS Management Console and navigate to the VPC Dashboard.
+
+2. On the left hand, under the Security heading, click Security groups.
+
+3. Select the security group associated with your RDS instance, and click Edit 
inbound rules.
+
+4. Add a new rule that allows PostgreSQL type traffic to the CIDR of the 
subnet(s) associated with the DB.
+
+Configure Airflow
+~~~~~~~~~~~~~~~~~
+
+To configure Airflow to utilize the ECS Executor and leverage the resources 
we've set up, create a script (e.g., ``ecs_executor_config.sh``) with the 
following contents:
+
+.. code-block:: bash
+
+   export 
AIRFLOW**CORE**EXECUTOR='airflow.providers.amazon.aws.executors.ecs.AwsEcsExecutor'
+
+   export AIRFLOW**DATABASE**SQL*ALCHEMY*CONN=<postgres-connection-string>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__REGION=<executor-region>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER=<ecs-cluster-name>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME=<ecs-container-name>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION=<task-definition-name>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE='FARGATE'
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__PLATFORM_VERSION='LATEST'
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP='True'
+
+   export 
AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS=<security-group-id-for-rds>
+
+   export AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS=<subnet-id-for-rds>
+
+
+This script should be run on the host(s) running the Airflow Scheduler and 
Webserver, before those processes are started.
+
+The script sets environment variables that configure Airflow to use the ECS 
Executor and provide necessary information for task execution. Any other 
configuration changes made (such as for remote logging) should be added to this 
example script to keep configuration consistent across the Airflow environment.
+
+Initialize the Airflow DB
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Airflow DB needs to be initialized before it can be used and a user needs 
to be added for you to log in. The below command adds an admin user (the 
command will also initialize the DB if it hasn't been already):
+
+.. code-block:: bash
+
+   airflow users create --username admin --password admin --firstname <your 
first name> --lastname <your last name> --email <your email> --role Admin
diff --git a/docs/apache-airflow-providers-amazon/executors/index.rst 
b/docs/apache-airflow-providers-amazon/executors/index.rst
new file mode 100644
index 0000000000..139ab0003d
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/executors/index.rst
@@ -0,0 +1,27 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Amazon Executors
+================
+
+
+.. toctree::
+    :maxdepth: 1
+
+    ECS Executor (experimental) <ecs-executor>
diff --git a/docs/apache-airflow-providers-amazon/index.rst 
b/docs/apache-airflow-providers-amazon/index.rst
index aaa1ab60ee..7959c3c651 100644
--- a/docs/apache-airflow-providers-amazon/index.rst
+++ b/docs/apache-airflow-providers-amazon/index.rst
@@ -41,6 +41,7 @@
     Secrets backends <secrets-backends/index>
     Logging for Tasks <logging/index>
     Configuration <configurations-ref>
+    Executors <executors/index>
 
 .. toctree::
     :hidden:
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index e50755f4a1..741ed6b67d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -122,6 +122,7 @@ Avro
 avro
 aws
 awsbatch
+awslogs
 Azkaban
 Azri
 backcompat
@@ -516,6 +517,7 @@ dynamodb
 dynload
 ec
 ecb
+ecs
 EdgeModifier
 EdgeModifiers
 EDITMSG
@@ -1622,6 +1624,7 @@ Un
 un
 unarchived
 unassigns
+uncomment
 uncommenting
 Undead
 undead
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 692defdbc0..3e4479921d 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -27,6 +27,7 @@
       "asgiref",
       "boto3>=1.28.0",
       "botocore>=1.31.0",
+      "inflection>=0.5.1",
       "jsonpath_ng>=1.5.3",
       "redshift_connector>=2.0.888",
       "sqlalchemy_redshift>=0.8.6",
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index 8137f1031d..e2b94c0051 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -1600,6 +1600,7 @@ def test_sensitive_values():
     suspected_sensitive = {(s, k) for (s, k) in all_keys if 
k.endswith(("password", "kwargs"))}
     exclude_list = {
         ("kubernetes_executor", "delete_option_kwargs"),
+        ("aws_ecs_executor", "run_task_kwargs"),  # Only a constrained set of 
values, none are sensitive
     }
     suspected_sensitive -= exclude_list
     sensitive_values.update(suspected_sensitive)
diff --git a/tests/providers/amazon/aws/executors/__init__.py 
b/tests/providers/amazon/aws/executors/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/amazon/aws/executors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/amazon/aws/executors/ecs/__init__.py 
b/tests/providers/amazon/aws/executors/ecs/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/amazon/aws/executors/ecs/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py 
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
new file mode 100644
index 0000000000..5fdbd310bb
--- /dev/null
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -0,0 +1,919 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime as dt
+import json
+import os
+import re
+import time
+from functools import partial
+from typing import Callable
+from unittest import mock
+
+import pytest
+import yaml
+from inflection import camelize
+
+from airflow.executors.base_executor import BaseExecutor
+from airflow.providers.amazon.aws.executors.ecs import (
+    CONFIG_GROUP_NAME,
+    AllEcsConfigKeys,
+    AwsEcsExecutor,
+    EcsTaskCollection,
+    ecs_executor_config,
+)
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_DEFAULTS,
+    EcsExecutorTask,
+    _recursive_flatten_dict,
+    parse_assign_public_ip,
+)
+from airflow.utils.helpers import convert_camel_to_snake
+from airflow.utils.state import State
+
+ARN1 = "arn1"
+ARN2 = "arn2"
+ARN3 = "arn3"
+
+
+def mock_task(arn=ARN1, state=State.RUNNING):
+    task = mock.Mock(spec=EcsExecutorTask, task_arn=arn)
+    task.api_failure_count = 0
+    task.get_task_state.return_value = state
+
+    return task
+
+
+# These first two fixtures look unusual.  For tests which do not care if the 
object
+# returned by the fixture is unique, use it like a normal fixture.  If your 
test
+# requires a unique value, then call it like a method.
+#
+# See `test_info_by_key` for an example of a test that requires two unique 
mocked queues.
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_key():
+    def _key():
+        return mock.Mock(spec=tuple)
+
+    return _key
+
+
+@pytest.fixture(autouse=True)
+def mock_queue():
+    def _queue():
+        return mock.Mock(spec=str)
+
+    return _queue
+
+
+# The following two fixtures look different because no existing test
+# cares if they have unique values, so the same value is always used.
+@pytest.fixture(autouse=True)
+def mock_cmd():
+    return mock.Mock(spec=list)
+
+
+@pytest.fixture(autouse=True)
+def mock_config():
+    return mock.Mock(spec=dict)
+
+
+@pytest.fixture
+def mock_executor() -> AwsEcsExecutor:
+    """Mock ECS to a repeatable starting state.."""
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.REGION_NAME}".upper()]
 = "us-west-1"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CLUSTER}".upper()] 
= "some-cluster"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()]
 = "container-name"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.TASK_DEFINITION}".upper()]
 = "some-task-def"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()]
 = "FARGATE"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.PLATFORM_VERSION}".upper()]
 = "LATEST"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.ASSIGN_PUBLIC_IP}".upper()]
 = "False"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SECURITY_GROUPS}".upper()]
 = "sg1,sg2"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SUBNETS}".upper()] 
= "sub1,sub2"
+    
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS}".upper()]
 = "3"
+    executor = AwsEcsExecutor()
+
+    # Replace boto3 ECS client with mock.
+    ecs_mock = mock.Mock(spec=executor.ecs)
+    run_task_ret_val = {"tasks": [{"taskArn": ARN1}], "failures": []}
+    ecs_mock.run_task.return_value = run_task_ret_val
+    executor.ecs = ecs_mock
+
+    return executor
+
+
+class TestEcsTaskCollection:
+    """Tests EcsTaskCollection Class."""
+
+    # You can't use a fixture in setup_method unless you declare setup_method 
to be a fixture itself.
+    @pytest.fixture(autouse=True)
+    def setup_method(self, mock_airflow_key):
+        # Create a new Collection and verify it is empty.
+        self.collection = EcsTaskCollection()
+        assert len(self.collection) == 0
+
+        # Generate two mock keys and assert they are different.  If the value
+        # of the key does not matter for a test, let it use the auto-fixture.
+        self.key1 = mock_airflow_key()
+        self.key2 = mock_airflow_key()
+        assert self.key1 != self.key2
+
+    def test_add_task(self):
+        # Add a task, verify that the collection has grown and the task arn 
matches.
+        self.collection.add_task(mock_task(ARN1), self.key1, mock_queue, 
mock_cmd, mock_config, 1)
+        assert len(self.collection) == 1
+        assert self.collection.tasks[ARN1].task_arn == ARN1
+
+        # Add a task, verify that the collection has grown and the task arn is 
not the same as the first.
+        self.collection.add_task(mock_task(ARN2), self.key2, mock_queue, 
mock_cmd, mock_config, 1)
+        assert len(self.collection) == 2
+        assert self.collection.tasks[ARN2].task_arn == ARN2
+        assert self.collection.tasks[ARN2].task_arn != 
self.collection.tasks[ARN1].task_arn
+
+    def test_task_by_key(self):
+        self.collection.add_task(mock_task(), mock_airflow_key, mock_queue, 
mock_cmd, mock_config, 1)
+
+        task = self.collection.task_by_key(mock_airflow_key)
+
+        assert task == self.collection.tasks[ARN1]
+
+    def test_task_by_arn(self):
+        self.collection.add_task(mock_task(), mock_airflow_key, mock_queue, 
mock_cmd, mock_config, 1)
+
+        task = self.collection.task_by_arn(ARN1)
+
+        assert task == self.collection.tasks[ARN1]
+
+    def test_info_by_key(self, mock_queue):
+        self.collection.add_task(mock_task(ARN1), self.key1, queue1 := 
mock_queue(), mock_cmd, mock_config, 1)
+        self.collection.add_task(mock_task(ARN2), self.key2, queue2 := 
mock_queue(), mock_cmd, mock_config, 1)
+        assert queue1 != queue2
+
+        task1_info = self.collection.info_by_key(self.key1)
+        assert task1_info.queue == queue1
+        assert task1_info.cmd == mock_cmd
+        assert task1_info.config == mock_config
+
+        task2_info = self.collection.info_by_key(self.key2)
+        assert task2_info.queue == queue2
+        assert task2_info.cmd == mock_cmd
+        assert task2_info.config == mock_config
+
+        assert task1_info != task2_info
+
+    def test_get_all_arns(self):
+        self.collection.add_task(mock_task(ARN1), self.key1, mock_queue, 
mock_cmd, mock_config, 1)
+        self.collection.add_task(mock_task(ARN2), self.key2, mock_queue, 
mock_cmd, mock_config, 1)
+
+        assert self.collection.get_all_arns() == [ARN1, ARN2]
+
+    def test_get_all_task_keys(self):
+        self.collection.add_task(mock_task(ARN1), self.key1, mock_queue, 
mock_cmd, mock_config, 1)
+        self.collection.add_task(mock_task(ARN2), self.key2, mock_queue, 
mock_cmd, mock_config, 1)
+
+        assert self.collection.get_all_task_keys() == [self.key1, self.key2]
+
+    def test_pop_by_key(self):
+        self.collection.add_task(mock_task(ARN1), self.key1, mock_queue, 
mock_cmd, mock_config, 1)
+        self.collection.add_task(mock_task(ARN2), self.key2, mock_queue, 
mock_cmd, mock_config, 1)
+        task1_as_saved = self.collection.tasks[ARN1]
+
+        assert len(self.collection) == 2
+        task1_as_popped = self.collection.pop_by_key(self.key1)
+        assert len(self.collection) == 1
+        # Assert it returns the same task.
+        assert task1_as_popped == task1_as_saved
+        # Assert the popped task is removed.
+        with pytest.raises(KeyError):
+            assert self.collection.task_by_key(self.key1)
+        # Assert the remaining task is task2.
+        assert self.collection.task_by_key(self.key2)
+
+    def test_update_task(self):
+        self.collection.add_task(
+            initial_task := mock_task(), mock_airflow_key, mock_queue, 
mock_cmd, mock_config, 1
+        )
+        assert self.collection[ARN1] == initial_task
+        self.collection.update_task(updated_task := mock_task())
+
+        assert self.collection[ARN1] == updated_task
+        assert initial_task != updated_task
+
+    def test_failure_count(self):
+        # Create a new Collection and add a two tasks.
+        self.collection.add_task(mock_task(ARN1), self.key1, mock_queue, 
mock_cmd, mock_config, 1)
+        self.collection.add_task(mock_task(ARN2), self.key2, mock_queue, 
mock_cmd, mock_config, 1)
+
+        # failure_count is set to attempt number, which is initialized as 1.
+        assert self.collection.failure_count_by_key(self.key1) == 1
+        for i in range(1, 5):
+            self.collection.increment_failure_count(self.key1)
+            assert self.collection.failure_count_by_key(self.key1) == i + 1
+        assert self.collection.failure_count_by_key(self.key2) == 1
+
+
+class TestEcsExecutorTask:
+    """Tests the EcsExecutorTask DTO."""
+
+    def test_repr(self):
+        last_status = "QUEUED"
+        desired_status = "SUCCESS"
+        running_task = EcsExecutorTask(
+            task_arn=ARN1, last_status=last_status, 
desired_status=desired_status, containers=[{}]
+        )
+        assert f"({ARN1}, {last_status}->{desired_status}, 
{running_task.get_task_state()})" == repr(
+            running_task
+        )
+
+    def test_queued_tasks(self):
+        """Tasks that are pending launch identified as 'queued'."""
+        queued_tasks = [
+            EcsExecutorTask(
+                task_arn=ARN1, last_status="PROVISIONING", 
desired_status="RUNNING", containers=[{}]
+            ),
+            EcsExecutorTask(task_arn=ARN2, last_status="PENDING", 
desired_status="RUNNING", containers=[{}]),
+            EcsExecutorTask(
+                task_arn=ARN3, last_status="ACTIVATING", 
desired_status="RUNNING", containers=[{}]
+            ),
+        ]
+        for task in queued_tasks:
+            assert State.QUEUED == task.get_task_state()
+
+    def test_running_tasks(self):
+        """Tasks that have been launched are identified as 'running'."""
+        running_task = EcsExecutorTask(
+            task_arn=ARN1, last_status="RUNNING", desired_status="RUNNING", 
containers=[{}]
+        )
+        assert State.RUNNING == running_task.get_task_state()
+
+    def test_running_tasks_edge_cases(self):
+        """Tasks that are not finished have been launched are identified as 
'running'."""
+        running_task = EcsExecutorTask(
+            task_arn=ARN1, last_status="QUEUED", desired_status="SUCCESS", 
containers=[{}]
+        )
+        assert State.RUNNING == running_task.get_task_state()
+
+    def test_removed_tasks(self):
+        """Tasks that failed to launch are identified as 'removed'."""
+        deprovisioning_tasks = [
+            EcsExecutorTask(
+                task_arn=ARN1, last_status="DEACTIVATING", 
desired_status="STOPPED", containers=[{}]
+            ),
+            EcsExecutorTask(task_arn=ARN2, last_status="STOPPING", 
desired_status="STOPPED", containers=[{}]),
+            EcsExecutorTask(
+                task_arn=ARN3, last_status="DEPROVISIONING", 
desired_status="STOPPED", containers=[{}]
+            ),
+        ]
+        for task in deprovisioning_tasks:
+            assert State.REMOVED == task.get_task_state()
+
+        removed_task = EcsExecutorTask(
+            task_arn="DEAD",
+            last_status="STOPPED",
+            desired_status="STOPPED",
+            containers=[{}],
+            stopped_reason="Timeout waiting for network interface provisioning 
to complete.",
+        )
+        assert State.REMOVED == removed_task.get_task_state()
+
+    def test_stopped_tasks(self):
+        """Tasks that have terminated are identified as either 'success' or 
'failure'."""
+        successful_container = {"exit_code": 0, "last_status": "STOPPED"}
+        error_container = {"exit_code": 100, "last_status": "STOPPED"}
+
+        for status in ("DEACTIVATING", "STOPPING", "DEPROVISIONING", 
"STOPPED"):
+            success_task = EcsExecutorTask(
+                task_arn="GOOD",
+                last_status=status,
+                desired_status="STOPPED",
+                stopped_reason="Essential container in task exited",
+                started_at=dt.datetime.now(),
+                containers=[successful_container],
+            )
+            assert State.SUCCESS == success_task.get_task_state()
+
+        for status in ("DEACTIVATING", "STOPPING", "DEPROVISIONING", 
"STOPPED"):
+            failed_task = EcsExecutorTask(
+                task_arn="FAIL",
+                last_status=status,
+                desired_status="STOPPED",
+                stopped_reason="Essential container in task exited",
+                started_at=dt.datetime.now(),
+                containers=[successful_container, successful_container, 
error_container],
+            )
+            assert State.FAILED == failed_task.get_task_state()
+
+
+class TestAwsEcsExecutor:
+    """Tests the AWS ECS Executor."""
+
+    def teardown_method(self) -> None:
+        self._unset_conf()
+
+    def test_execute(self, mock_airflow_key, mock_executor):
+        """Test execution from end-to-end."""
+        airflow_key = mock_airflow_key()
+
+        mock_executor.ecs.run_task.return_value = {
+            "tasks": [
+                {
+                    "taskArn": ARN1,
+                    "lastStatus": "",
+                    "desiredStatus": "",
+                    "containers": [{"name": "some-ecs-container"}],
+                }
+            ],
+            "failures": [],
+        }
+
+        assert 0 == len(mock_executor.pending_tasks)
+        mock_executor.execute_async(airflow_key, mock_cmd)
+        assert 1 == len(mock_executor.pending_tasks)
+
+        mock_executor.attempt_task_runs()
+        mock_executor.ecs.run_task.assert_called_once()
+
+        # Task is stored in active worker.
+        assert 1 == len(mock_executor.active_workers)
+        assert ARN1 in 
mock_executor.active_workers.task_by_key(airflow_key).task_arn
+
+    def test_success_execute_api_exception(self, mock_executor):
+        """Test what happens when ECS throws an exception, but ultimately runs 
the task."""
+        run_task_exception = Exception("Test exception")
+        run_task_success = {
+            "tasks": [
+                {
+                    "taskArn": ARN1,
+                    "lastStatus": "",
+                    "desiredStatus": "",
+                    "containers": [{"name": "some-ecs-container"}],
+                }
+            ],
+            "failures": [],
+        }
+        mock_executor.ecs.run_task.side_effect = [run_task_exception, 
run_task_exception, run_task_success]
+        mock_executor.execute_async(mock_airflow_key, mock_cmd)
+
+        # Fail 2 times
+        for _ in range(2):
+            mock_executor.attempt_task_runs()
+            # Task is not stored in active workers.
+            assert len(mock_executor.active_workers) == 0
+
+        # Pass in last attempt
+        mock_executor.attempt_task_runs()
+        assert len(mock_executor.pending_tasks) == 0
+        assert ARN1 in mock_executor.active_workers.get_all_arns()
+
+    def test_failed_execute_api_exception(self, mock_executor):
+        """Test what happens when ECS refuses to execute a task and throws an 
exception"""
+        mock_executor.ecs.run_task.side_effect = Exception("Test exception")
+        mock_executor.execute_async(mock_airflow_key, mock_cmd)
+
+        # No matter what, don't schedule until run_task becomes successful.
+        for _ in range(int(mock_executor.MAX_RUN_TASK_ATTEMPTS) * 2):
+            mock_executor.attempt_task_runs()
+            # Task is not stored in active workers.
+            assert len(mock_executor.active_workers) == 0
+
+    def test_failed_execute_api(self, mock_executor):
+        """Test what happens when ECS refuses to execute a task."""
+        mock_executor.ecs.run_task.return_value = {
+            "tasks": [],
+            "failures": [
+                {"arn": ARN1, "reason": "Sample Failure", "detail": "UnitTest 
Failure - Please ignore"}
+            ],
+        }
+
+        mock_executor.execute_async(mock_airflow_key, mock_cmd)
+
+        # No matter what, don't schedule until run_task becomes successful.
+        for _ in range(int(mock_executor.MAX_RUN_TASK_ATTEMPTS) * 2):
+            mock_executor.attempt_task_runs()
+            # Task is not stored in active workers.
+            assert len(mock_executor.active_workers) == 0
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    def test_sync(self, success_mock, fail_mock, mock_executor):
+        """Test sync from end-to-end."""
+        self._mock_sync(mock_executor)
+
+        mock_executor.sync_running_tasks()
+        mock_executor.ecs.describe_tasks.assert_called_once()
+
+        # Task is not stored in active workers.
+        assert len(mock_executor.active_workers) == 0
+        # Task is immediately succeeded.
+        success_mock.assert_called_once()
+        fail_mock.assert_not_called()
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    @mock.patch.object(EcsTaskCollection, "get_all_arns", return_value=[])
+    def test_sync_short_circuits_with_no_arns(self, _, success_mock, 
fail_mock, mock_executor):
+        self._mock_sync(mock_executor)
+
+        mock_executor.sync_running_tasks()
+
+        mock_executor.ecs.describe_tasks.assert_not_called()
+        fail_mock.assert_not_called()
+        success_mock.assert_not_called()
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    def test_failed_sync(self, success_mock, fail_mock, mock_executor):
+        """Test success and failure states."""
+        self._mock_sync(mock_executor, State.FAILED)
+
+        mock_executor.sync()
+        mock_executor.ecs.describe_tasks.assert_called_once()
+
+        # Task is not stored in active workers.
+        assert len(mock_executor.active_workers) == 0
+        # Task is immediately succeeded.
+        fail_mock.assert_called_once()
+        success_mock.assert_not_called()
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    def test_removed_sync(self, fail_mock, success_mock, mock_executor):
+        """A removed task will increment failure count but call neither fail() 
nor success()."""
+        self._mock_sync(mock_executor, expected_state=State.REMOVED, 
set_task_state=State.REMOVED)
+        task_instance_key = mock_executor.active_workers.arn_to_key[ARN1]
+
+        mock_executor.sync_running_tasks()
+
+        assert ARN1 in mock_executor.active_workers.get_all_arns()
+        assert 
mock_executor.active_workers.key_to_failure_counts[task_instance_key] == 2
+        fail_mock.assert_not_called()
+        success_mock.assert_not_called()
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    def test_failed_sync_cumulative_fail(self, success_mock, fail_mock, 
mock_airflow_key, mock_executor):
+        """Test that failure_count/attempt_number is cumulative for pending 
tasks and active workers."""
+        AwsEcsExecutor.MAX_RUN_TASK_ATTEMPTS = "5"
+        mock_executor.ecs.run_task.return_value = {
+            "tasks": [],
+            "failures": [
+                {"arn": ARN1, "reason": "Sample Failure", "detail": "UnitTest 
Failure - Please ignore"}
+            ],
+        }
+        task_key = mock_airflow_key()
+        mock_executor.execute_async(task_key, mock_cmd)
+        for _ in range(2):
+            assert len(mock_executor.pending_tasks) == 1
+            keys = [task.key for task in mock_executor.pending_tasks]
+            assert task_key in keys
+            mock_executor.attempt_task_runs()
+            assert len(mock_executor.pending_tasks) == 1
+
+        mock_executor.ecs.run_task.return_value = {
+            "tasks": [
+                {
+                    "taskArn": ARN1,
+                    "lastStatus": "",
+                    "desiredStatus": "",
+                    "containers": [{"name": "some-ecs-container"}],
+                }
+            ],
+            "failures": [],
+        }
+        mock_executor.attempt_task_runs()
+        assert len(mock_executor.pending_tasks) == 0
+        assert ARN1 in mock_executor.active_workers.get_all_arns()
+
+        mock_executor.ecs.describe_tasks.return_value = {
+            "tasks": [],
+            "failures": [
+                {"arn": ARN1, "reason": "Sample Failure", "detail": "UnitTest 
Failure - Please ignore"}
+            ],
+        }
+
+        # Call sync_running_tasks 2 times with failures.
+        for _ in range(2):
+            mock_executor.sync_running_tasks()
+
+            # Ensure task arn is not removed from active.
+            assert ARN1 in mock_executor.active_workers.get_all_arns()
+
+            # Task is neither failed nor succeeded.
+            fail_mock.assert_not_called()
+            success_mock.assert_not_called()
+
+        # run_task failed twice, and passed once
+        assert mock_executor.ecs.run_task.call_count == 3
+        # describe_tasks failed 2 times so far
+        assert mock_executor.ecs.describe_tasks.call_count == 2
+
+        # 2 run_task failures + 2 describe_task failures = 4 failures
+        # Last call should fail the task.
+        mock_executor.sync_running_tasks()
+        assert ARN1 not in mock_executor.active_workers.get_all_arns()
+        fail_mock.assert_called()
+        success_mock.assert_not_called()
+
+    def test_failed_sync_api_exception(self, mock_executor, caplog):
+        """Test what happens when ECS sync fails for certain tasks 
repeatedly."""
+        self._mock_sync(mock_executor)
+        mock_executor.ecs.describe_tasks.side_effect = Exception("Test 
Exception")
+
+        mock_executor.sync()
+        assert "Failed to sync" in caplog.messages[0]
+
+    @mock.patch.object(BaseExecutor, "fail")
+    @mock.patch.object(BaseExecutor, "success")
+    def test_failed_sync_api(self, success_mock, fail_mock, mock_executor):
+        """Test what happens when ECS sync fails for certain tasks 
repeatedly."""
+        self._mock_sync(mock_executor)
+        mock_executor.ecs.describe_tasks.return_value = {
+            "tasks": [],
+            "failures": [
+                {"arn": ARN1, "reason": "Sample Failure", "detail": "UnitTest 
Failure - Please ignore"}
+            ],
+        }
+
+        # Call Sync 2 times with failures. The task can only fail 
MAX_RUN_TASK_ATTEMPTS times.
+        for check_count in range(1, int(AwsEcsExecutor.MAX_RUN_TASK_ATTEMPTS)):
+            mock_executor.sync_running_tasks()
+            assert mock_executor.ecs.describe_tasks.call_count == check_count
+
+            # Ensure task arn is not removed from active.
+            assert ARN1 in mock_executor.active_workers.get_all_arns()
+
+            # Task is neither failed nor succeeded.
+            fail_mock.assert_not_called()
+            success_mock.assert_not_called()
+
+        # Last call should fail the task.
+        mock_executor.sync_running_tasks()
+        assert ARN1 not in mock_executor.active_workers.get_all_arns()
+        fail_mock.assert_called()
+        success_mock.assert_not_called()
+
+    def test_terminate(self, mock_executor):
+        """Test that executor can shut everything down; forcing all tasks to 
unnaturally exit."""
+        self._mock_sync(mock_executor, State.FAILED)
+
+        mock_executor.terminate()
+
+        mock_executor.ecs.stop_task.assert_called()
+
+    def test_end(self, mock_executor):
+        """Test that executor can end successfully; waiting for all tasks to 
naturally exit."""
+        mock_executor.sync = partial(self._sync_mock_with_call_counts, 
mock_executor.sync)
+
+        self._mock_sync(mock_executor, State.FAILED)
+
+        mock_executor.end(heartbeat_interval=0)
+
+    @mock.patch.object(time, "sleep", return_value=None)
+    def test_end_with_queued_tasks_will_wait(self, _, mock_executor):
+        """Test that executor can end successfully; waiting for all tasks to 
naturally exit."""
+        sync_call_count = 0
+        sync_func = mock_executor.sync
+
+        def sync_mock():
+            """Mock won't work here, because we actually want to call the 
'sync' func."""
+            nonlocal sync_call_count
+            sync_func()
+            sync_call_count += 1
+
+            if sync_call_count == 1:
+                # On the second pass, remove the pending task. This is the 
equivalent of using
+                # mock side_effects to simulate a pending task the first time 
(triggering the
+                # sleep()) and no pending task the second pass, triggering the 
break and allowing
+                # the executor to shut down.
+                mock_executor.active_workers.update_task(
+                    EcsExecutorTask(
+                        ARN2,
+                        "STOPPED",
+                        "STOPPED",
+                        {"exit_code": 0, "name": "some-ecs-container", 
"last_status": "STOPPED"},
+                    )
+                )
+                self.response_task2_json.update({"desiredStatus": "STOPPED", 
"lastStatus": "STOPPED"})
+                mock_executor.ecs.describe_tasks.return_value = {
+                    "tasks": [self.response_task2_json],
+                    "failures": [],
+                }
+
+        mock_executor.sync = sync_mock
+
+        self._add_mock_task(mock_executor, ARN1)
+        self._add_mock_task(mock_executor, ARN2)
+
+        base_response_task_json = {
+            "startedAt": dt.datetime.now(),
+            "containers": [{"name": "some-ecs-container", "lastStatus": 
"STOPPED", "exitCode": 0}],
+        }
+        self.response_task1_json = {
+            "taskArn": ARN1,
+            "desiredStatus": "STOPPED",
+            "lastStatus": "SUCCESS",
+            **base_response_task_json,
+        }
+        self.response_task2_json = {
+            "taskArn": ARN2,
+            "desiredStatus": "QUEUED",
+            "lastStatus": "QUEUED",
+            **base_response_task_json,
+        }
+
+        mock_executor.ecs.describe_tasks.return_value = {
+            "tasks": [self.response_task1_json, self.response_task2_json],
+            "failures": [],
+        }
+
+        mock_executor.end(heartbeat_interval=0)
+
+        assert sync_call_count == 2
+
+    @pytest.mark.parametrize(
+        "bad_config",
+        [
+            pytest.param({"name": "bad_robot"}, 
id="executor_config_can_not_overwrite_name"),
+            pytest.param({"command": "bad_robot"}, 
id="executor_config_can_not_overwrite_command"),
+        ],
+    )
+    def test_executor_config_exceptions(self, bad_config, mock_executor):
+        with pytest.raises(ValueError) as raised:
+            mock_executor.execute_async(mock_airflow_key, mock_cmd, 
executor_config=bad_config)
+
+        assert raised.match('Executor Config should never override "name" or 
"command"')
+        assert 0 == len(mock_executor.pending_tasks)
+
+    @mock.patch.object(ecs_executor_config, "build_task_kwargs")
+    def test_container_not_found(self, mock_build_task_kwargs, mock_executor):
+        mock_build_task_kwargs.return_value({"overrides": 
{"containerOverrides": [{"name": "foo"}]}})
+
+        with pytest.raises(KeyError) as raised:
+            AwsEcsExecutor()
+        assert raised.match(
+            re.escape(
+                "Rendered JSON template does not contain key "
+                '"overrides[containerOverrides][containers][x][command]"'
+            )
+        )
+        assert 0 == len(mock_executor.pending_tasks)
+
+    @staticmethod
+    def _unset_conf():
+        for env in os.environ:
+            if env.startswith(f"AIRFLOW__{CONFIG_GROUP_NAME.upper()}__"):
+                os.environ.pop(env)
+
+    def _mock_sync(
+        self,
+        executor: AwsEcsExecutor,
+        expected_state: State = State.SUCCESS,
+        set_task_state: State = State.RUNNING,
+    ) -> None:
+        """Mock ECS to the expected state."""
+        self._add_mock_task(executor, ARN1, set_task_state)
+
+        response_task_json = {
+            "taskArn": ARN1,
+            "desiredStatus": "STOPPED",
+            "lastStatus": set_task_state,
+            "containers": [
+                {
+                    "name": "some-ecs-container",
+                    "lastStatus": "STOPPED",
+                    "exitCode": 100 if expected_state in [State.FAILED, 
State.QUEUED] else 0,
+                }
+            ],
+        }
+        if not set_task_state == State.REMOVED:
+            response_task_json["startedAt"] = dt.datetime.now()
+        assert expected_state == 
BotoTaskSchema().load(response_task_json).get_task_state()
+
+        executor.ecs.describe_tasks.return_value = {"tasks": 
[response_task_json], "failures": []}
+
+    @staticmethod
+    def _add_mock_task(executor: AwsEcsExecutor, arn: str, state: State = 
State.RUNNING):
+        task = mock_task(arn, state)
+        executor.active_workers.add_task(task, mock.Mock(spec=tuple), 
mock_queue, mock_cmd, mock_config, 1)
+
+    def _sync_mock_with_call_counts(self, sync_func: Callable):
+        """Mock won't work here, because we actually want to call the 'sync' 
func."""
+        # If we call `mock_executor.sync()` here directly we get endless 
recursion below
+        # because we are assigning it to itself with `mock_executor.sync = 
sync_mock`.
+        self.sync_call_count = 0
+
+        sync_func()
+        self.sync_call_count += 1
+
+
+class TestEcsExecutorConfig:
+    @pytest.fixture()
+    def assign_subnets(self):
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SUBNETS}".upper()] 
= "sub1,sub2"
+
+    @staticmethod
+    def teardown_method() -> None:
+        for env in os.environ:
+            if env.startswith(f"AIRFLOW__{CONFIG_GROUP_NAME}__".upper()):
+                os.environ.pop(env)
+
+    def test_flatten_dict(self):
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SUBNETS}".upper()] 
= "sub1,sub2"
+        nested_dict = {"a": "a", "b": "b", "c": {"d": "d"}}
+
+        assert _recursive_flatten_dict(nested_dict) == {"a": "a", "b": "b", 
"d": "d"}
+
+    def test_validate_config_defaults(self):
+        """Assert that the defaults stated in the config.yml file match those 
in utils.CONFIG_DEFAULTS."""
+        curr_dir = os.path.dirname(os.path.abspath(__file__))
+        executor_path = "aws/executors/ecs"
+        config_filename = curr_dir.replace("tests", 
"airflow").replace(executor_path, "provider.yaml")
+
+        with open(config_filename) as config:
+            options = 
yaml.safe_load(config)["config"][CONFIG_GROUP_NAME]["options"]
+            file_defaults = {
+                option: default for (option, value) in options.items() if 
(default := value.get("default"))
+            }
+
+        assert len(file_defaults) == len(CONFIG_DEFAULTS)
+        for key in file_defaults.keys():
+            assert file_defaults[key] == CONFIG_DEFAULTS[key]
+
+    def test_subnets_required(self):
+        assert 
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SUBNETS}".upper() not in 
os.environ
+
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.REGION_NAME}".upper()]
 = "us-west-1"
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CLUSTER}".upper()] 
= "some-cluster"
+        os.environ[
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()
+        ] = "container-name"
+        os.environ[
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.TASK_DEFINITION}".upper()
+        ] = "some-task-def"
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()]
 = "FARGATE"
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.PLATFORM_VERSION}".upper()]
 = "LATEST"
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.ASSIGN_PUBLIC_IP}".upper()]
 = "False"
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SECURITY_GROUPS}".upper()]
 = "sg1,sg2"
+
+        with pytest.raises(ValueError) as raised:
+            ecs_executor_config.build_task_kwargs()
+        assert raised.match("At least one subnet is required to run a task.")
+
+    def test_config_defaults_are_applied(self, assign_subnets):
+        os.environ[
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()
+        ] = "container-name"
+        from airflow.providers.amazon.aws.executors.ecs import 
ecs_executor_config
+
+        task_kwargs = 
_recursive_flatten_dict(ecs_executor_config.build_task_kwargs())
+        found_keys = {convert_camel_to_snake(key): key for key in 
task_kwargs.keys()}
+
+        for expected_key, expected_value in CONFIG_DEFAULTS.items():
+            # "conn_id" and max_run_task_attempts are used by the executor, 
but are not expected to appear
+            # in the task_kwargs.
+            if expected_key in [AllEcsConfigKeys.AWS_CONN_ID, 
AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS]:
+                assert expected_key not in found_keys.keys()
+            else:
+                assert expected_key in found_keys.keys()
+                # Make sure to convert "assign_public_ip" from True/False to 
ENABLE/DISABLE.
+                if expected_key is AllEcsConfigKeys.ASSIGN_PUBLIC_IP:
+                    expected_value = parse_assign_public_ip(expected_value)
+                assert expected_value == task_kwargs[found_keys[expected_key]]
+
+    def test_provided_values_override_defaults(self, assign_subnets):
+        """
+        Expected precedence is default values are overwritten by values 
provided explicitly,
+        and those values are overwritten by those provided in run_task_kwargs.
+        """
+        default_version = CONFIG_DEFAULTS[AllEcsConfigKeys.PLATFORM_VERSION]
+        templated_version = "1"
+        first_explicit_version = "2"
+        second_explicit_version = "3"
+
+        run_task_kwargs_env_key = 
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
+        platform_version_env_key = (
+            
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.PLATFORM_VERSION}".upper()
+        )
+        # Required param which doesn't have a default
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()]
 = "foobar"
+
+        # Confirm the default value is applied when no value is provided.
+        assert run_task_kwargs_env_key not in os.environ
+        assert platform_version_env_key not in os.environ
+        from airflow.providers.amazon.aws.executors.ecs import 
ecs_executor_config
+
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        assert task_kwargs["platformVersion"] == default_version
+
+        # Provide a new value explicitly and assert that it is applied over 
the default.
+        os.environ[platform_version_env_key] = first_explicit_version
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        assert task_kwargs["platformVersion"] == first_explicit_version
+
+        # Provide a value via template and assert that it is applied over the 
explicit value.
+        os.environ[run_task_kwargs_env_key] = json.dumps(
+            {AllEcsConfigKeys.PLATFORM_VERSION: templated_version}
+        )
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        assert task_kwargs["platformVersion"] == templated_version
+
+        # Provide a new value explicitly and assert it is not applied over the 
templated values.
+        os.environ[platform_version_env_key] = second_explicit_version
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        assert task_kwargs["platformVersion"] == templated_version
+
+    def test_count_can_not_be_modified_by_the_user(self, assign_subnets):
+        """The ``count`` parameter must always be 1; verify that the user can 
not override this value."""
+
+        templated_version = "1"
+        templated_cluster = "templated_cluster_name"
+        provided_run_task_kwargs = {
+            AllEcsConfigKeys.PLATFORM_VERSION: templated_version,
+            AllEcsConfigKeys.CLUSTER: templated_cluster,
+            "count": 2,  # The user should not be allowed to overwrite count, 
it must be value of 1
+        }
+
+        run_task_kwargs_env_key = 
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
+        # Required param which doesn't have a default
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()]
 = "foobar"
+
+        # Provide values via task run kwargs template and assert that they are 
applied,
+        # which verifies that the OTHER values were changed.
+        os.environ[run_task_kwargs_env_key] = 
json.dumps(provided_run_task_kwargs)
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+        assert task_kwargs["platformVersion"] == templated_version
+        assert task_kwargs["cluster"] == templated_cluster
+
+        # Assert that count was NOT overridden when the others were applied.
+        assert task_kwargs["count"] == 1
+
+    def test_verify_tags_are_used_as_provided(self, assign_subnets):
+        """Confirm that the ``tags`` provided are not converted to 
camelCase."""
+        templated_tags = {"Apache": "Airflow"}
+
+        provided_run_task_kwargs = {
+            "tags": templated_tags,  # The user should be allowed to pass 
arbitrary run task args
+        }
+
+        run_task_kwargs_env_key = 
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
+        # Required param which doesn't have a default
+        
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()]
 = "foobar"
+
+        os.environ[run_task_kwargs_env_key] = 
json.dumps(provided_run_task_kwargs)
+        task_kwargs = ecs_executor_config.build_task_kwargs()
+
+        # Verify that tag names are exempt from the camel-case conversion.
+        assert task_kwargs["tags"] == templated_tags
+
+    def test_that_provided_kwargs_are_moved_to_correct_nesting(self, 
assign_subnets):
+        """
+        kwargs such as subnets, security groups,  public ip, and container 
name are valid run task kwargs,
+        but they are not placed at the root of the kwargs dict, they should be 
nested in various sub dicts.
+        Ensure we don't leave any behind in the wrong location.
+        """
+        kwargs_to_test = {
+            AllEcsConfigKeys.CONTAINER_NAME: "foobar",
+            AllEcsConfigKeys.ASSIGN_PUBLIC_IP: "True",
+            AllEcsConfigKeys.SECURITY_GROUPS: "sg1,sg2",
+            AllEcsConfigKeys.SUBNETS: "sub1,sub2",
+        }
+        for key, value in kwargs_to_test.items():
+            os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{key}".upper()] = value
+
+        run_task_kwargs = ecs_executor_config.build_task_kwargs()
+        run_task_kwargs_network_config = 
run_task_kwargs["networkConfiguration"]["awsvpcConfiguration"]
+        for key, value in kwargs_to_test.items():
+            # Assert that the values are not at the root of the kwargs
+            camelized_key = camelize(key, uppercase_first_letter=False)
+
+            assert key not in run_task_kwargs
+            assert camelized_key not in run_task_kwargs
+            if key == AllEcsConfigKeys.CONTAINER_NAME:
+                # The actual ECS run_task_kwarg is "name" not "containerName"
+                assert 
run_task_kwargs["overrides"]["containerOverrides"][0]["name"] == value
+            elif key == AllEcsConfigKeys.ASSIGN_PUBLIC_IP:
+                # The value for this kwarg is cast from bool to 
enabled/disabled
+                assert run_task_kwargs_network_config[camelized_key] == 
"ENABLED"
+            else:
+                assert run_task_kwargs_network_config[camelized_key] == 
value.split(",")
diff --git a/tests/providers/amazon/aws/hooks/test_base_aws.py 
b/tests/providers/amazon/aws/hooks/test_base_aws.py
index e584fb98e4..85964433e2 100644
--- a/tests/providers/amazon/aws/hooks/test_base_aws.py
+++ b/tests/providers/amazon/aws/hooks/test_base_aws.py
@@ -41,6 +41,7 @@ from moto.core import DEFAULT_ACCOUNT_ID
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models.connection import Connection
+from airflow.providers.amazon.aws.executors.ecs import AwsEcsExecutor
 from airflow.providers.amazon.aws.hooks.base_aws import (
     AwsBaseHook,
     AwsGenericHook,
@@ -419,7 +420,7 @@ class TestAwsBaseHook:
         return dict(tag.split("/") for tag in user_agent_string.split(" "))
 
     @pytest.mark.parametrize("found_classes", [["RandomOperator"], 
["BaseSensorOperator", "TestSensor"]])
-    @mock.patch.object(AwsBaseHook, "_find_class_name")
+    @mock.patch.object(AwsBaseHook, "_find_operator_class_name")
     def test_user_agent_caller_target_function_found(self, mock_class_name, 
found_classes):
         mock_class_name.side_effect = found_classes
 
@@ -428,6 +429,21 @@ class TestAwsBaseHook:
         assert mock_class_name.call_count == len(found_classes)
         assert user_agent_tags["Caller"] == found_classes[-1]
 
+    @mock.patch.object(AwsEcsExecutor, "_load_run_kwargs")
+    def test_user_agent_caller_target_executor_found(self, 
mock_load_run_kwargs):
+        with conf_vars(
+            {
+                ("aws_ecs_executor", "cluster"): "foo",
+                ("aws_ecs_executor", "region_name"): "us-east-1",
+                ("aws_ecs_executor", "container_name"): "bar",
+                ("aws_ecs_executor", "conn_id"): "fish",
+            }
+        ):
+            executor = AwsEcsExecutor()
+
+        user_agent_dict = dict(tag.split("/") for tag in 
executor.ecs.meta.config.user_agent.split(" "))
+        assert user_agent_dict["Caller"] == "AwsEcsExecutor"
+
     def test_user_agent_caller_target_function_not_found(self):
         default_caller_name = "Unknown"
 

Reply via email to