This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a324ba86a8 Add `BedrockCreateEvaluationJobOperator` (#66722)
0a324ba86a8 is described below

commit 0a324ba86a837926bd7d7217115e455e5c37c439
Author: John Jackson <[email protected]>
AuthorDate: Tue May 19 08:52:06 2026 -0700

    Add `BedrockCreateEvaluationJobOperator` (#66722)
---
 providers/amazon/docs/operators/bedrock.rst        |  15 +++
 .../providers/amazon/aws/operators/bedrock.py      |  56 ++++++++
 .../amazon/aws/example_bedrock_evaluation.py       | 142 +++++++++++++++++++++
 .../unit/amazon/aws/operators/test_bedrock.py      |  62 +++++++++
 4 files changed, 275 insertions(+)

diff --git a/providers/amazon/docs/operators/bedrock.rst 
b/providers/amazon/docs/operators/bedrock.rst
index 22c7b23688a..6cfd217b400 100644
--- a/providers/amazon/docs/operators/bedrock.rst
+++ b/providers/amazon/docs/operators/bedrock.rst
@@ -382,6 +382,21 @@ To delete an Amazon Bedrock guardrail, use
     :end-before: [END howto_operator_bedrock_delete_guardrail]
 
 
+
+.. _howto/operator:BedrockCreateEvaluationJobOperator:
+
+Create an Evaluation Job
+------------------------
+
+To create an Amazon Bedrock model evaluation job, use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateEvaluationJobOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bedrock_create_evaluation_job]
+    :end-before: [END howto_operator_bedrock_create_evaluation_job]
+
 Reference
 ---------
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
index 3c0319cdd8e..d4c6c7fb36e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
@@ -1278,3 +1278,59 @@ class 
BedrockUpdateGuardrailOperator(AwsBaseOperator[BedrockHook]):
         response = self.hook.conn.update_guardrail(**kwargs)
         self.log.info("Updated guardrail %s version %s", 
response["guardrailId"], response["version"])
         return response["guardrailId"]
+
+
+class BedrockCreateEvaluationJobOperator(AwsBaseOperator[BedrockHook]):
+    """
+    Create an Amazon Bedrock model evaluation job.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BedrockCreateEvaluationJobOperator`
+
+    :param job_name: The name of the evaluation job. (templated)
+    :param role_arn: The IAM role ARN for the evaluation job. (templated)
+    :param evaluation_config: The evaluation configuration dict. (templated)
+    :param inference_config: The inference configuration dict. (templated)
+    :param output_data_config: The output data configuration dict. (templated)
+    :param job_description: Optional description. (templated)
+    """
+
+    aws_hook_class = BedrockHook
+    template_fields: Sequence[str] = aws_template_fields("job_name", 
"role_arn", "job_description")
+
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        role_arn: str,
+        evaluation_config: dict[str, Any],
+        inference_config: dict[str, Any],
+        output_data_config: dict[str, Any],
+        job_description: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_name = job_name
+        self.role_arn = role_arn
+        self.evaluation_config = evaluation_config
+        self.inference_config = inference_config
+        self.output_data_config = output_data_config
+        self.job_description = job_description
+
+    def execute(self, context: Context) -> str:
+        self.log.info("Creating evaluation job %s", self.job_name)
+        kwargs: dict[str, Any] = prune_dict(
+            {
+                "jobName": self.job_name,
+                "roleArn": self.role_arn,
+                "evaluationConfig": self.evaluation_config,
+                "inferenceConfig": self.inference_config,
+                "outputDataConfig": self.output_data_config,
+                "jobDescription": self.job_description,
+            }
+        )
+        response = self.hook.conn.create_evaluation_job(**kwargs)
+        job_arn = response["jobArn"]
+        self.log.info("Created evaluation job %s: %s", self.job_name, job_arn)
+        return job_arn
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py 
b/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
new file mode 100644
index 00000000000..d64700cf02a
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.bedrock import 
BedrockCreateEvaluationJobOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3CreateObjectOperator,
+    S3DeleteBucketOperator,
+)
+from airflow.providers.common.compat.sdk import DAG, chain
+
+from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import TriggerRule, task
+else:
+    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+DAG_ID = "example_bedrock_evaluation"
+
+ROLE_ARN_KEY = "ROLE_ARN"
+sys_test_context_task = 
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+# Minimal JSONL dataset for Summarization task
+EVAL_DATASET = (
+    '{"prompt": "Summarize: The quick brown fox jumps over the lazy dog.",'
+    ' "referenceResponse": "A fox jumps over a dog."}\n'
+)
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule=None,
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    role_arn = test_context[ROLE_ARN_KEY]
+    bucket_name = "airflow-system-test-bedrock-eval"
+
+    # TEST SETUP
+    create_bucket = S3CreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=bucket_name,
+    )
+
+    upload_dataset = S3CreateObjectOperator(
+        task_id="upload_dataset",
+        s3_bucket=bucket_name,
+        s3_key="datasets/summarization.jsonl",
+        data=EVAL_DATASET,
+    )
+
+    # TEST BODY
+    # [START howto_operator_bedrock_create_evaluation_job]
+    create_evaluation_job = BedrockCreateEvaluationJobOperator(
+        task_id="create_evaluation_job",
+        job_name=f"{env_id}-eval",
+        role_arn=role_arn,
+        evaluation_config={
+            "automated": {
+                "datasetMetricConfigs": [
+                    {
+                        "taskType": "Summarization",
+                        "dataset": {
+                            "name": "eval-dataset",
+                            "datasetLocation": {"s3Uri": 
f"s3://{bucket_name}/datasets/summarization.jsonl"},
+                        },
+                        "metricNames": ["Builtin.Accuracy"],
+                    }
+                ]
+            }
+        },
+        inference_config={
+            "models": [
+                {
+                    "bedrockModel": {
+                        "modelIdentifier": 
"us.anthropic.claude-haiku-4-5-20251001-v1:0",
+                        "inferenceParams": "{}",
+                    }
+                }
+            ]
+        },
+        output_data_config={"s3Uri": f"s3://{bucket_name}/output/"},
+    )
+    # [END howto_operator_bedrock_create_evaluation_job]
+
+    # TEST TEARDOWN
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def stop_evaluation_job(job_arn: str):
+        import contextlib
+
+        import boto3
+
+        with contextlib.suppress(Exception):
+            boto3.client("bedrock").stop_evaluation_job(jobIdentifier=job_arn)
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=bucket_name,
+        force_delete=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        create_bucket,
+        upload_dataset,
+        # TEST BODY
+        create_evaluation_job,
+        # TEST TEARDOWN
+        stop_evaluation_job(create_evaluation_job.output),
+        delete_bucket,
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+test_run = get_test_run(dag)
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
index 7c6b1531fa4..e0b6f8f7cbb 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
@@ -30,6 +30,7 @@ from airflow.providers.amazon.aws.hooks.bedrock import 
BedrockAgentHook, Bedrock
 from airflow.providers.amazon.aws.operators.bedrock import (
     BedrockBatchInferenceOperator,
     BedrockCreateDataSourceOperator,
+    BedrockCreateEvaluationJobOperator,
     BedrockCreateGuardrailOperator,
     BedrockCreateGuardrailVersionOperator,
     BedrockCreateKnowledgeBaseOperator,
@@ -998,3 +999,64 @@ class TestBedrockUpdateGuardrailOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.operator)
+
+
+EVAL_JOB_NAME = "test-eval-job"
+EVAL_JOB_ARN = 
"arn:aws:bedrock:us-east-1:123456789012:evaluation-job/test-eval-job"
+ROLE_ARN = "arn:aws:iam::123456789012:role/test-role"
+EVAL_CONFIG = {"automated": {"datasetMetricConfigs": [{"taskType": 
"Summarization"}]}}
+INFERENCE_CONFIG = {"models": [{"bedrockModel": {"modelIdentifier": 
"anthropic.claude-v2"}}]}
+OUTPUT_CONFIG = {"s3Uri": "s3://bucket/output/"}
+
+
+class TestBedrockCreateEvaluationJobOperator:
+    def setup_method(self):
+        self.operator = BedrockCreateEvaluationJobOperator(
+            task_id="create_eval_job",
+            job_name=EVAL_JOB_NAME,
+            role_arn=ROLE_ARN,
+            evaluation_config=EVAL_CONFIG,
+            inference_config=INFERENCE_CONFIG,
+            output_data_config=OUTPUT_CONFIG,
+        )
+
+    @mock.patch.object(BedrockHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.create_evaluation_job.return_value = {"jobArn": 
EVAL_JOB_ARN}
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        mock_client.create_evaluation_job.assert_called_once_with(
+            jobName=EVAL_JOB_NAME,
+            roleArn=ROLE_ARN,
+            evaluationConfig=EVAL_CONFIG,
+            inferenceConfig=INFERENCE_CONFIG,
+            outputDataConfig=OUTPUT_CONFIG,
+        )
+        assert result == EVAL_JOB_ARN
+
+    @mock.patch.object(BedrockHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_with_description(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.create_evaluation_job.return_value = {"jobArn": 
EVAL_JOB_ARN}
+        mock_conn.return_value = mock_client
+
+        op = BedrockCreateEvaluationJobOperator(
+            task_id="create_eval_job",
+            job_name=EVAL_JOB_NAME,
+            role_arn=ROLE_ARN,
+            evaluation_config=EVAL_CONFIG,
+            inference_config=INFERENCE_CONFIG,
+            output_data_config=OUTPUT_CONFIG,
+            job_description="Test evaluation",
+        )
+        result = op.execute({})
+
+        call_kwargs = mock_client.create_evaluation_job.call_args[1]
+        assert call_kwargs["jobDescription"] == "Test evaluation"
+        assert result == EVAL_JOB_ARN
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)

Reply via email to