This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ac475f1d8a Add Amazon Bedrock AgentCore Runtime operators (#67984)
0ac475f1d8a is described below

commit 0ac475f1d8a50657ed80b4ef495e02ab2add8c01
Author: Morgan <[email protected]>
AuthorDate: Fri Jun 5 10:20:06 2026 -0300

    Add Amazon Bedrock AgentCore Runtime operators (#67984)
---
 docs/spelling_wordlist.txt                         |   1 +
 providers/amazon/docs/operators/bedrock.rst        |  52 +++++
 .../airflow/providers/amazon/aws/hooks/bedrock.py  |  40 ++++
 .../providers/amazon/aws/operators/bedrock.py      | 247 ++++++++++++++++++++
 .../providers/amazon/aws/triggers/bedrock.py       |  53 ++++-
 .../aws/waiters/bedrock-agentcore-control.json     |  48 ++++
 .../system/amazon/aws/example_bedrock_agentcore.py | 111 +++++++++
 .../tests/unit/amazon/aws/hooks/test_bedrock.py    |   4 +
 .../unit/amazon/aws/operators/test_bedrock.py      | 249 ++++++++++++++++++++-
 .../tests/unit/amazon/aws/triggers/test_bedrock.py |  47 +++-
 .../aws/waiters/test_bedrock_agentcore_control.py  |  74 ++++++
 11 files changed, 923 insertions(+), 3 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 7f24b753a17..202d24f00a3 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -23,6 +23,7 @@ adobjects
 AdsInsights
 adsinsights
 afterall
+agentcore
 agentic
 ai
 aio
diff --git a/providers/amazon/docs/operators/bedrock.rst 
b/providers/amazon/docs/operators/bedrock.rst
index 6cfd217b400..bdf628d9f79 100644
--- a/providers/amazon/docs/operators/bedrock.rst
+++ b/providers/amazon/docs/operators/bedrock.rst
@@ -74,6 +74,58 @@ To invoke a Claude Sonnet model using the Messages API you 
would use:
     :start-after: [START howto_operator_invoke_claude_model]
     :end-before: [END howto_operator_invoke_claude_model]
 
+.. _howto/operator:BedrockCreateAgentRuntimeOperator:
+
+Create an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To create an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateAgentRuntimeOperator`.
+
+AgentCore Runtime creation is asynchronous. The operator can wait until the 
runtime reaches
+``READY`` either synchronously or in deferrable mode.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bedrock_create_agent_runtime]
+    :end-before: [END howto_operator_bedrock_create_agent_runtime]
+
+.. _howto/operator:BedrockInvokeAgentRuntimeOperator:
+
+Invoke an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To invoke an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockInvokeAgentRuntimeOperator`.
+
+The operator waits for the runtime invocation response and returns the 
response body along with
+runtime response metadata such as the runtime session ID when present.
+For long-running AgentCore Runtime invocations, configure the botocore client 
``read_timeout``
+through ``botocore_config``.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bedrock_invoke_agent_runtime]
+    :end-before: [END howto_operator_bedrock_invoke_agent_runtime]
+
+.. _howto/operator:BedrockDeleteAgentRuntimeOperator:
+
+Delete an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To delete an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockDeleteAgentRuntimeOperator`.
+
+The operator accepts the runtime ID, which can be extracted from the ARN 
returned by
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateAgentRuntimeOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bedrock_delete_agent_runtime]
+    :end-before: [END howto_operator_bedrock_delete_agent_runtime]
 
 .. _howto/operator:BedrockCustomizeModelOperator:
 
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
index c8fd53a9b7f..9042f0754b7 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
@@ -106,3 +106,43 @@ class BedrockAgentRuntimeHook(AwsBaseHook):
     def __init__(self, *args, **kwargs) -> None:
         kwargs["client_type"] = self.client_type
         super().__init__(*args, **kwargs)
+
+
+class BedrockAgentCoreControlHook(AwsBaseHook):
+    """
+    Interact with the Amazon Bedrock AgentCore control plane API.
+
+    Provide thin wrapper around 
:external+boto3:py:class:`boto3.client("bedrock-agentcore-control") 
<BedrockAgentCoreControl.Client>`.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    client_type = "bedrock-agentcore-control"
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = self.client_type
+        super().__init__(*args, **kwargs)
+
+
+class BedrockAgentCoreHook(AwsBaseHook):
+    """
+    Interact with the Amazon Bedrock AgentCore runtime plane API.
+
+    Provide thin wrapper around 
:external+boto3:py:class:`boto3.client("bedrock-agentcore") 
<BedrockAgentCore.Client>`.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    client_type = "bedrock-agentcore"
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = self.client_type
+        super().__init__(*args, **kwargs)
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
index d4c6c7fb36e..fe0cecf6b04 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
@@ -24,6 +24,8 @@ from typing import TYPE_CHECKING, Any, Literal
 from botocore.exceptions import ClientError
 
 from airflow.providers.amazon.aws.hooks.bedrock import (
+    BedrockAgentCoreControlHook,
+    BedrockAgentCoreHook,
     BedrockAgentHook,
     BedrockAgentRuntimeHook,
     BedrockHook,
@@ -31,6 +33,7 @@ from airflow.providers.amazon.aws.hooks.bedrock import (
 )
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
 from airflow.providers.amazon.aws.triggers.bedrock import (
+    BedrockAgentRuntimeReadyTrigger,
     BedrockBatchInferenceCompletedTrigger,
     BedrockCustomizeModelCompletedTrigger,
     BedrockIngestionJobTrigger,
@@ -111,6 +114,250 @@ class 
BedrockInvokeModelOperator(AwsBaseOperator[BedrockRuntimeHook]):
         return response_body
 
 
+class 
BedrockCreateAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreControlHook]):
+    """
+    Create an Amazon Bedrock AgentCore Runtime.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BedrockCreateAgentRuntimeOperator`
+
+    :param agent_runtime_name: The name of the AgentCore Runtime. (templated)
+    :param agent_runtime_artifact: The artifact configuration for the 
AgentCore Runtime. (templated)
+    :param role_arn: The ARN of the IAM role for the AgentCore Runtime. 
(templated)
+    :param network_configuration: The network configuration for the AgentCore 
Runtime. (templated)
+    :param create_agent_runtime_kwargs: Any optional parameters to pass to the 
API. (templated)
+    :param wait_for_completion: Whether to wait for the AgentCore Runtime to 
reach READY. (default: True)
+    :param waiter_delay: Time in seconds to wait between status checks. 
(default: 60)
+    :param waiter_max_attempts: Maximum number of attempts to check for 
runtime readiness. (default: 20)
+    :param deferrable: If True, the operator will wait asynchronously for the 
AgentCore Runtime
+        to reach READY. This implies waiting for completion. This mode 
requires aiobotocore
+        module to be installed. (default: False)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether or not to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = BedrockAgentCoreControlHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "agent_runtime_name",
+        "agent_runtime_artifact",
+        "role_arn",
+        "network_configuration",
+        "create_agent_runtime_kwargs",
+    )
+
+    def __init__(
+        self,
+        *,
+        agent_runtime_name: str,
+        agent_runtime_artifact: dict[str, Any],
+        role_arn: str,
+        network_configuration: dict[str, Any],
+        create_agent_runtime_kwargs: dict[str, Any] | None = None,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 60,
+        waiter_max_attempts: int = 20,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.agent_runtime_name = agent_runtime_name
+        self.agent_runtime_artifact = agent_runtime_artifact
+        self.role_arn = role_arn
+        self.network_configuration = network_configuration
+        self.create_agent_runtime_kwargs = create_agent_runtime_kwargs or {}
+        self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.deferrable = deferrable
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> str:
+        validated_event = validate_execute_complete_event(event)
+
+        if validated_event["status"] != "success":
+            raise RuntimeError(f"Error while creating AgentCore Runtime: 
{validated_event}")
+
+        self.log.info("Bedrock AgentCore Runtime `%s` is ready.", 
validated_event["agent_runtime_arn"])
+        return validated_event["agent_runtime_arn"]
+
+    def execute(self, context: Context) -> str:
+        response = self.hook.conn.create_agent_runtime(
+            agentRuntimeName=self.agent_runtime_name,
+            agentRuntimeArtifact=self.agent_runtime_artifact,
+            roleArn=self.role_arn,
+            networkConfiguration=self.network_configuration,
+            **self.create_agent_runtime_kwargs,
+        )
+        agent_runtime_arn = response["agentRuntimeArn"]
+        agent_runtime_id = response["agentRuntimeId"]
+        agent_runtime_version = response["agentRuntimeVersion"]
+
+        if self.deferrable:
+            self.log.info("Deferring until AgentCore Runtime %s reaches 
READY.", agent_runtime_arn)
+            self.defer(
+                trigger=BedrockAgentRuntimeReadyTrigger(
+                    agent_runtime_id=agent_runtime_id,
+                    agent_runtime_version=agent_runtime_version,
+                    agent_runtime_arn=agent_runtime_arn,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                ),
+                method_name="execute_complete",
+            )
+        elif self.wait_for_completion:
+            self.log.info("Waiting for AgentCore Runtime %s to reach READY.", 
agent_runtime_arn)
+            self.hook.get_waiter("agent_runtime_ready").wait(
+                agentRuntimeId=agent_runtime_id,
+                agentRuntimeVersion=agent_runtime_version,
+                WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
+            )
+
+        return agent_runtime_arn
+
+
+class BedrockInvokeAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreHook]):
+    """
+    Invoke an Amazon Bedrock AgentCore Runtime.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BedrockInvokeAgentRuntimeOperator`
+
+    :param agent_runtime_arn: The ARN of the AgentCore Runtime to invoke. 
(templated)
+    :param payload: The invocation payload. Dict and list payloads are JSON 
serialized. (templated)
+    :param content_type: The MIME type of the input payload. (templated) 
Default: application/json
+    :param accept: The desired MIME type of the response. (templated) Default: 
application/json
+    :param invoke_agent_runtime_kwargs: Any optional parameters to pass to the 
API. (templated)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether or not to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = BedrockAgentCoreHook
+    template_fields: Sequence[str] = aws_template_fields(
+        "agent_runtime_arn",
+        "payload",
+        "content_type",
+        "accept",
+        "invoke_agent_runtime_kwargs",
+    )
+
+    def __init__(
+        self,
+        *,
+        agent_runtime_arn: str,
+        payload: dict[str, Any] | list[Any] | str | bytes,
+        content_type: str | None = "application/json",
+        accept: str | None = "application/json",
+        invoke_agent_runtime_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.agent_runtime_arn = agent_runtime_arn
+        self.payload = payload
+        self.content_type = content_type
+        self.accept = accept
+        self.invoke_agent_runtime_kwargs = invoke_agent_runtime_kwargs or {}
+
+    @staticmethod
+    def _serialize_payload(payload: dict[str, Any] | list[Any] | str | bytes) 
-> bytes:
+        if isinstance(payload, bytes):
+            return payload
+        if isinstance(payload, str):
+            return payload.encode()
+        return json.dumps(payload).encode()
+
+    @staticmethod
+    def _read_response_body(response_body: Any) -> bytes:
+        if hasattr(response_body, "read"):
+            response_body = response_body.read()
+        if isinstance(response_body, bytes):
+            return response_body
+        if isinstance(response_body, str):
+            return response_body.encode()
+        return json.dumps(response_body).encode()
+
+    @staticmethod
+    def _deserialize_response_body(response_body: bytes, content_type: str | 
None) -> Any:
+        try:
+            response_text = response_body.decode()
+        except UnicodeDecodeError:
+            return response_body
+
+        if content_type and "json" in content_type.lower():
+            return json.loads(response_text)
+        return response_text
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        response = self.hook.conn.invoke_agent_runtime(
+            **prune_dict(
+                {
+                    "agentRuntimeArn": self.agent_runtime_arn,
+                    "payload": self._serialize_payload(self.payload),
+                    "contentType": self.content_type,
+                    "accept": self.accept,
+                    **self.invoke_agent_runtime_kwargs,
+                }
+            )
+        )
+        response_body = self._deserialize_response_body(
+            self._read_response_body(response["response"]),
+            response.get("contentType") or self.accept,
+        )
+        return {
+            key: value for key, value in response.items() if key not in 
{"ResponseMetadata", "response"}
+        } | {"response": response_body}
+
+
+class 
BedrockDeleteAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreControlHook]):
+    """
+    Delete an Amazon Bedrock AgentCore Runtime.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BedrockDeleteAgentRuntimeOperator`
+
+    :param agent_runtime_id: The unique identifier of the AgentCore Runtime to 
delete. (templated)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether or not to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = BedrockAgentCoreControlHook
+    template_fields: Sequence[str] = aws_template_fields("agent_runtime_id")
+
+    def __init__(self, *, agent_runtime_id: str, **kwargs):
+        super().__init__(**kwargs)
+        self.agent_runtime_id = agent_runtime_id
+
+    def execute(self, context: Context) -> None:
+        
self.hook.conn.delete_agent_runtime(agentRuntimeId=self.agent_runtime_id)
+        self.log.info("Deleted Bedrock AgentCore Runtime %s.", 
self.agent_runtime_id)
+
+
 class BedrockCustomizeModelOperator(AwsBaseOperator[BedrockHook]):
     """
     Create a fine-tuning job to customize a base model.
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
index 70d8254a316..49b4ce17da8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
@@ -18,7 +18,11 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook, 
BedrockHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+    BedrockAgentCoreControlHook,
+    BedrockAgentHook,
+    BedrockHook,
+)
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
 from airflow.providers.amazon.version_compat import NOTSET, ArgNotSet
 
@@ -185,6 +189,53 @@ class BedrockIngestionJobTrigger(AwsBaseWaiterTrigger):
         return BedrockAgentHook(aws_conn_id=self.aws_conn_id)
 
 
+class BedrockAgentRuntimeReadyTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger when a Bedrock AgentCore Runtime reaches the READY state.
+
+    :param agent_runtime_id: The unique identifier of the AgentCore Runtime.
+    :param agent_runtime_version: The version of the AgentCore Runtime.
+    :param agent_runtime_arn: The ARN of the AgentCore Runtime.
+    :param waiter_delay: The amount of time in seconds to wait between 
attempts. (default: 60)
+    :param waiter_max_attempts: The maximum number of attempts to be made. 
(default: 20)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(
+        self,
+        *,
+        agent_runtime_id: str,
+        agent_runtime_version: str,
+        agent_runtime_arn: str,
+        waiter_delay: int = 60,
+        waiter_max_attempts: int = 20,
+        aws_conn_id: str | None = None,
+    ) -> None:
+        super().__init__(
+            serialized_fields={
+                "agent_runtime_id": agent_runtime_id,
+                "agent_runtime_version": agent_runtime_version,
+                "agent_runtime_arn": agent_runtime_arn,
+            },
+            waiter_name="agent_runtime_ready",
+            waiter_args={
+                "agentRuntimeId": agent_runtime_id,
+                "agentRuntimeVersion": agent_runtime_version,
+            },
+            failure_message="Bedrock AgentCore Runtime creation failed.",
+            status_message="Status of Bedrock AgentCore Runtime is",
+            status_queries=["status"],
+            return_key="agent_runtime_arn",
+            return_value=agent_runtime_arn,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return BedrockAgentCoreControlHook(aws_conn_id=self.aws_conn_id)
+
+
 class BedrockBaseBatchInferenceTrigger(AwsBaseWaiterTrigger):
     """
     Trigger when a batch inference job is complete.
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
 
b/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
new file mode 100644
index 00000000000..0e12b88e3ff
--- /dev/null
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
@@ -0,0 +1,48 @@
+{
+    "version": 2,
+    "waiters": {
+        "agent_runtime_ready": {
+            "delay": 60,
+            "maxAttempts": 20,
+            "operation": "GetAgentRuntime",
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "READY",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "CREATING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "UPDATING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "CREATE_FAILED",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "UPDATE_FAILED",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "status",
+                    "expected": "DELETING",
+                    "state": "failure"
+                }
+            ]
+        }
+    }
+}
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py 
b/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
new file mode 100644
index 00000000000..ca93029c93c
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.bedrock import (
+    BedrockCreateAgentRuntimeOperator,
+    BedrockDeleteAgentRuntimeOperator,
+    BedrockInvokeAgentRuntimeOperator,
+)
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import DAG, chain
+else:
+    from airflow.models.baseoperator import chain  # type: 
ignore[attr-defined,no-redef]
+    from airflow.models.dag import DAG  # type: 
ignore[attr-defined,no-redef,assignment]
+
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+from system.amazon.aws.utils import SystemTestContextBuilder
+
+# Externally fetched variables:
+ROLE_ARN_KEY = "ROLE_ARN"
+CONTAINER_URI_KEY = "CONTAINER_URI"
+
+sys_test_context_task = (
+    
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(CONTAINER_URI_KEY).build()
+)
+
+DAG_ID = "example_bedrock_agentcore"
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context["ENV_ID"]
+
+    runtime_name = f"airflow-agentcore-{env_id}"
+
+    # [START howto_operator_bedrock_create_agent_runtime]
+    create_agent_runtime = BedrockCreateAgentRuntimeOperator(
+        task_id="create_agent_runtime",
+        agent_runtime_name=runtime_name,
+        agent_runtime_artifact={
+            "containerConfiguration": {
+                "containerUri": test_context[CONTAINER_URI_KEY],
+            },
+        },
+        role_arn=test_context[ROLE_ARN_KEY],
+        network_configuration={"networkMode": "PUBLIC"},
+        deferrable=True,
+    )
+    # [END howto_operator_bedrock_create_agent_runtime]
+
+    # [START howto_operator_bedrock_invoke_agent_runtime]
+    invoke_agent_runtime = BedrockInvokeAgentRuntimeOperator(
+        task_id="invoke_agent_runtime",
+        agent_runtime_arn=create_agent_runtime.output,
+        payload={"prompt": "Hello from Airflow"},
+        botocore_config={"read_timeout": 300},
+    )
+    # [END howto_operator_bedrock_invoke_agent_runtime]
+
+    # [START howto_operator_bedrock_delete_agent_runtime]
+    delete_agent_runtime = BedrockDeleteAgentRuntimeOperator(
+        task_id="delete_agent_runtime",
+        agent_runtime_id="{{ 
task_instance.xcom_pull('create_agent_runtime').split('/')[-1] }}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_operator_bedrock_delete_agent_runtime]
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_agent_runtime,
+        invoke_agent_runtime,
+        # TEST TEARDOWN
+        delete_agent_runtime,
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+test_run = get_test_run(dag)
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py 
b/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
index 19fbdf41043..b8577a99290 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
@@ -21,6 +21,8 @@ from unittest import mock
 import pytest
 
 from airflow.providers.amazon.aws.hooks.bedrock import (
+    BedrockAgentCoreControlHook,
+    BedrockAgentCoreHook,
     BedrockAgentHook,
     BedrockAgentRuntimeHook,
     BedrockHook,
@@ -36,6 +38,8 @@ class TestBedrockHooks:
             pytest.param(BedrockRuntimeHook(), "bedrock-runtime", 
id="bedrock-runtime"),
             pytest.param(BedrockAgentHook(), "bedrock-agent", 
id="bedrock-agent"),
             pytest.param(BedrockAgentRuntimeHook(), "bedrock-agent-runtime", 
id="bedrock-agent-runtime"),
+            pytest.param(BedrockAgentCoreControlHook(), 
"bedrock-agentcore-control", id="agentcore-control"),
+            pytest.param(BedrockAgentCoreHook(), "bedrock-agentcore", 
id="agentcore"),
         ],
     )
     def test_bedrock_hooks(self, test_hook, service_name):
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
index e0b6f8f7cbb..8a97f09dc2d 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import json
 from collections.abc import Generator
+from io import BytesIO
 from typing import TYPE_CHECKING
 from unittest import mock
 
@@ -26,9 +27,16 @@ import pytest
 from botocore.exceptions import ClientError
 from moto import mock_aws
 
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook, 
BedrockHook, BedrockRuntimeHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+    BedrockAgentCoreControlHook,
+    BedrockAgentCoreHook,
+    BedrockAgentHook,
+    BedrockHook,
+    BedrockRuntimeHook,
+)
 from airflow.providers.amazon.aws.operators.bedrock import (
     BedrockBatchInferenceOperator,
+    BedrockCreateAgentRuntimeOperator,
     BedrockCreateDataSourceOperator,
     BedrockCreateEvaluationJobOperator,
     BedrockCreateGuardrailOperator,
@@ -36,8 +44,10 @@ from airflow.providers.amazon.aws.operators.bedrock import (
     BedrockCreateKnowledgeBaseOperator,
     BedrockCreateProvisionedModelThroughputOperator,
     BedrockCustomizeModelOperator,
+    BedrockDeleteAgentRuntimeOperator,
     BedrockDeleteGuardrailOperator,
     BedrockIngestDataOperator,
+    BedrockInvokeAgentRuntimeOperator,
     BedrockInvokeModelOperator,
     BedrockRaGOperator,
     BedrockUpdateGuardrailOperator,
@@ -84,6 +94,243 @@ class TestBedrockInvokeModelOperator:
         assert response["generation"] == self.GENERATED_RESPONSE
 
 
+class TestBedrockCreateAgentRuntimeOperator:
+    AGENT_RUNTIME_ARN = 
"arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test"
+    AGENT_RUNTIME_ID = "runtime_id"
+    AGENT_RUNTIME_VERSION = "1"
+
+    def setup_method(self):
+        self.operator = BedrockCreateAgentRuntimeOperator(
+            task_id="create_agent_runtime",
+            agent_runtime_name="test-runtime",
+            agent_runtime_artifact={"containerConfiguration": {"containerUri": 
"image_uri"}},
+            role_arn="role_arn",
+            network_configuration={"networkMode": "PUBLIC"},
+            create_agent_runtime_kwargs={"description": "test runtime"},
+        )
+        self.operator.defer = mock.MagicMock()
+
+    @pytest.mark.parametrize(
+        ("wait_for_completion", "deferrable"),
+        [
+            pytest.param(False, False, id="no_wait"),
+            pytest.param(True, False, id="wait"),
+            pytest.param(False, True, id="defer"),
+            pytest.param(True, True, id="defer_takes_precedence"),
+        ],
+    )
+    @mock.patch.object(BedrockAgentCoreControlHook, "get_waiter")
+    @mock.patch.object(BedrockAgentCoreControlHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_create_agent_runtime_wait_combinations(
+        self,
+        mock_conn,
+        mock_get_waiter,
+        wait_for_completion,
+        deferrable,
+    ):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.create_agent_runtime.return_value = {
+            "agentRuntimeArn": self.AGENT_RUNTIME_ARN,
+            "agentRuntimeId": self.AGENT_RUNTIME_ID,
+            "agentRuntimeVersion": self.AGENT_RUNTIME_VERSION,
+            "status": "CREATING",
+        }
+        self.operator.wait_for_completion = wait_for_completion
+        self.operator.deferrable = deferrable
+
+        response = self.operator.execute({})
+
+        assert response == self.AGENT_RUNTIME_ARN
+        mock_client.create_agent_runtime.assert_called_once_with(
+            agentRuntimeName="test-runtime",
+            agentRuntimeArtifact={"containerConfiguration": {"containerUri": 
"image_uri"}},
+            roleArn="role_arn",
+            networkConfiguration={"networkMode": "PUBLIC"},
+            description="test runtime",
+        )
+        assert self.operator.defer.call_count == deferrable
+
+        if wait_for_completion and not deferrable:
+            mock_get_waiter.assert_called_once_with("agent_runtime_ready")
+            mock_get_waiter.return_value.wait.assert_called_once_with(
+                agentRuntimeId=self.AGENT_RUNTIME_ID,
+                agentRuntimeVersion=self.AGENT_RUNTIME_VERSION,
+                WaiterConfig={"Delay": 60, "MaxAttempts": 20},
+            )
+        else:
+            mock_get_waiter.assert_not_called()
+
+    @mock.patch.object(BedrockAgentCoreControlHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_create_agent_runtime_no_extra_kwargs(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.create_agent_runtime.return_value = {
+            "agentRuntimeArn": self.AGENT_RUNTIME_ARN,
+            "agentRuntimeId": self.AGENT_RUNTIME_ID,
+            "agentRuntimeVersion": self.AGENT_RUNTIME_VERSION,
+            "status": "CREATING",
+        }
+        operator = BedrockCreateAgentRuntimeOperator(
+            task_id="create_agent_runtime",
+            agent_runtime_name="test-runtime",
+            agent_runtime_artifact={"containerConfiguration": {"containerUri": 
"image_uri"}},
+            role_arn="role_arn",
+            network_configuration={"networkMode": "PUBLIC"},
+            wait_for_completion=False,
+        )
+
+        operator.execute({})
+
+        mock_client.create_agent_runtime.assert_called_once_with(
+            agentRuntimeName="test-runtime",
+            agentRuntimeArtifact={"containerConfiguration": {"containerUri": 
"image_uri"}},
+            roleArn="role_arn",
+            networkConfiguration={"networkMode": "PUBLIC"},
+        )
+
+    def test_execute_complete_success(self):
+        result = self.operator.execute_complete(
+            {},
+            {"status": "success", "agent_runtime_arn": self.AGENT_RUNTIME_ARN},
+        )
+
+        assert result == self.AGENT_RUNTIME_ARN
+
+    def test_execute_complete_error(self):
+        with pytest.raises(RuntimeError):
+            self.operator.execute_complete(
+                {},
+                {"status": "error", "message": "failed", "agent_runtime_arn": 
self.AGENT_RUNTIME_ARN},
+            )
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)
+
+
+class TestBedrockInvokeAgentRuntimeOperator:
+    AGENT_RUNTIME_ARN = 
"arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test"
+
+    @mock.patch.object(BedrockAgentCoreHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_invoke_agent_runtime_json_response(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.invoke_agent_runtime.return_value = {
+            "runtimeSessionId": "session_id",
+            "contentType": "application/json",
+            "statusCode": 200,
+            "response": BytesIO(b'{"answer": "hello"}'),
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+        operator = BedrockInvokeAgentRuntimeOperator(
+            task_id="invoke_agent_runtime",
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+            payload={"prompt": "hello"},
+            invoke_agent_runtime_kwargs={"runtimeSessionId": "session_id"},
+        )
+
+        response = operator.execute({})
+
+        assert response == {
+            "runtimeSessionId": "session_id",
+            "contentType": "application/json",
+            "statusCode": 200,
+            "response": {"answer": "hello"},
+        }
+        mock_client.invoke_agent_runtime.assert_called_once_with(
+            agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+            payload=b'{"prompt": "hello"}',
+            contentType="application/json",
+            accept="application/json",
+            runtimeSessionId="session_id",
+        )
+
+    @mock.patch.object(BedrockAgentCoreHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_invoke_agent_runtime_text_response(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.invoke_agent_runtime.return_value = {
+            "contentType": "text/plain",
+            "statusCode": 200,
+            "response": BytesIO(b"hello"),
+        }
+        operator = BedrockInvokeAgentRuntimeOperator(
+            task_id="invoke_agent_runtime",
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+            payload="hello",
+            content_type="text/plain",
+            accept="text/plain",
+        )
+
+        response = operator.execute({})
+
+        assert response["response"] == "hello"
+        mock_client.invoke_agent_runtime.assert_called_once_with(
+            agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+            payload=b"hello",
+            contentType="text/plain",
+            accept="text/plain",
+        )
+
+    @mock.patch.object(BedrockAgentCoreHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_invoke_agent_runtime_prunes_none_content_type_and_accept(self, 
mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.invoke_agent_runtime.return_value = {
+            "statusCode": 200,
+            "response": BytesIO(b"hello"),
+        }
+        operator = BedrockInvokeAgentRuntimeOperator(
+            task_id="invoke_agent_runtime",
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+            payload="hello",
+            content_type=None,
+            accept=None,
+        )
+
+        operator.execute({})
+
+        mock_client.invoke_agent_runtime.assert_called_once_with(
+            agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+            payload=b"hello",
+        )
+
+    def test_template_fields(self):
+        operator = BedrockInvokeAgentRuntimeOperator(
+            task_id="invoke_agent_runtime",
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+            payload={"prompt": "hello"},
+        )
+
+        validate_template_fields(operator)
+
+
+class TestBedrockDeleteAgentRuntimeOperator:
+    AGENT_RUNTIME_ID = "runtime_id"
+
+    @mock.patch.object(BedrockAgentCoreControlHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_delete_agent_runtime(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_conn.return_value = mock_client
+        mock_client.delete_agent_runtime.return_value = {}
+        operator = BedrockDeleteAgentRuntimeOperator(
+            task_id="delete_agent_runtime",
+            agent_runtime_id=self.AGENT_RUNTIME_ID,
+        )
+
+        operator.execute({})
+
+        
mock_client.delete_agent_runtime.assert_called_once_with(agentRuntimeId=self.AGENT_RUNTIME_ID)
+
+    def test_template_fields(self):
+        validate_template_fields(
+            BedrockDeleteAgentRuntimeOperator(
+                task_id="delete_agent_runtime",
+                agent_runtime_id=self.AGENT_RUNTIME_ID,
+            )
+        )
+
+
 class TestBedrockCustomizeModelOperator:
     CUSTOMIZE_JOB_ARN = "valid_arn"
     CUSTOMIZE_JOB_NAME = "testModelJob"
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py 
b/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
index 963f96211e7..a5f6936fdc1 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
@@ -21,8 +21,13 @@ from unittest.mock import AsyncMock
 
 import pytest
 
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook, 
BedrockHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+    BedrockAgentCoreControlHook,
+    BedrockAgentHook,
+    BedrockHook,
+)
 from airflow.providers.amazon.aws.triggers.bedrock import (
+    BedrockAgentRuntimeReadyTrigger,
     BedrockBatchInferenceCompletedTrigger,
     BedrockBatchInferenceScheduledTrigger,
     BedrockCustomizeModelCompletedTrigger,
@@ -173,6 +178,46 @@ class 
TestBedrockIngestionJobTrigger(TestBaseBedrockTrigger):
         mock_get_waiter().wait.assert_called_once()
 
 
+class TestBedrockAgentRuntimeReadyTrigger(TestBaseBedrockTrigger):
+    EXPECTED_WAITER_NAME = "agent_runtime_ready"
+
+    AGENT_RUNTIME_ID = "runtime_id"
+    AGENT_RUNTIME_VERSION = "1"
+    AGENT_RUNTIME_ARN = "runtime_arn"
+
+    def test_serialization(self):
+        """Assert that arguments and classpath are correctly serialized."""
+        trigger = BedrockAgentRuntimeReadyTrigger(
+            agent_runtime_id=self.AGENT_RUNTIME_ID,
+            agent_runtime_version=self.AGENT_RUNTIME_VERSION,
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+        )
+        classpath, kwargs = trigger.serialize()
+        assert classpath == BASE_TRIGGER_CLASSPATH + 
"BedrockAgentRuntimeReadyTrigger"
+        assert kwargs.get("agent_runtime_id") == self.AGENT_RUNTIME_ID
+        assert kwargs.get("agent_runtime_version") == 
self.AGENT_RUNTIME_VERSION
+        assert kwargs.get("agent_runtime_arn") == self.AGENT_RUNTIME_ARN
+
+    @pytest.mark.asyncio
+    @mock.patch.object(BedrockAgentCoreControlHook, "get_waiter")
+    @mock.patch.object(BedrockAgentCoreControlHook, "get_async_conn")
+    async def test_run_success(self, mock_async_conn, mock_get_waiter):
+        mock_async_conn.__aenter__.return_value = mock.MagicMock()
+        mock_get_waiter().wait = AsyncMock()
+        trigger = BedrockAgentRuntimeReadyTrigger(
+            agent_runtime_id=self.AGENT_RUNTIME_ID,
+            agent_runtime_version=self.AGENT_RUNTIME_VERSION,
+            agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+        )
+
+        generator = trigger.run()
+        response = await generator.asend(None)
+
+        assert_expected_waiter_type(mock_get_waiter, self.EXPECTED_WAITER_NAME)
+        assert response == TriggerEvent({"status": "success", 
"agent_runtime_arn": self.AGENT_RUNTIME_ARN})
+        mock_get_waiter().wait.assert_called_once()
+
+
 class TestBedrockBatchInferenceCompletedTrigger(TestBaseBedrockTrigger):
     EXPECTED_WAITER_NAME = "batch_inference_complete"
 
diff --git 
a/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
 
b/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
new file mode 100644
index 00000000000..cc2cf479126
--- /dev/null
+++ 
b/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import botocore
+import pytest
+
+from airflow.providers.amazon.aws.hooks.bedrock import 
BedrockAgentCoreControlHook
+
+
+class TestBedrockAgentCoreControlCustomWaiters:
+    def test_service_waiters(self):
+        assert "agent_runtime_ready" in 
BedrockAgentCoreControlHook().list_waiters()
+
+
+class TestBedrockAgentCoreControlCustomWaitersBase:
+    @pytest.fixture(autouse=True)
+    def mock_conn(self, monkeypatch):
+        self.client = boto3.client("bedrock-agentcore-control", 
region_name="us-east-1")
+        monkeypatch.setattr(BedrockAgentCoreControlHook, "conn", self.client)
+
+
+class 
TestAgentRuntimeReadyWaiter(TestBedrockAgentCoreControlCustomWaitersBase):
+    WAITER_NAME = "agent_runtime_ready"
+    WAITER_ARGS = {"agentRuntimeId": "runtime_id", "agentRuntimeVersion": "1"}
+    SUCCESS_STATES = ["READY"]
+    FAILURE_STATES = ["CREATE_FAILED", "UPDATE_FAILED", "DELETING"]
+    INTERMEDIATE_STATES = ["CREATING", "UPDATING"]
+
+    @pytest.fixture
+    def mock_getter(self):
+        with mock.patch.object(self.client, "get_agent_runtime") as getter:
+            yield getter
+
+    @pytest.mark.parametrize("state", SUCCESS_STATES)
+    def test_agent_runtime_ready_complete(self, state, mock_getter):
+        mock_getter.return_value = {"status": state}
+
+        
BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(**self.WAITER_ARGS)
+
+    @pytest.mark.parametrize("state", FAILURE_STATES)
+    def test_agent_runtime_ready_failed(self, state, mock_getter):
+        mock_getter.return_value = {"status": state}
+
+        with pytest.raises(botocore.exceptions.WaiterError):
+            
BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(**self.WAITER_ARGS)
+
+    @pytest.mark.parametrize("state", INTERMEDIATE_STATES)
+    def test_agent_runtime_ready_wait(self, state, mock_getter):
+        wait = {"status": state}
+        success = {"status": "READY"}
+        mock_getter.side_effect = [wait, wait, success]
+
+        BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(
+            **self.WAITER_ARGS,
+            WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
+        )


Reply via email to