This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0a324ba86a8 Add `BedrockCreateEvaluationJobOperator` (#66722)
0a324ba86a8 is described below
commit 0a324ba86a837926bd7d7217115e455e5c37c439
Author: John Jackson <[email protected]>
AuthorDate: Tue May 19 08:52:06 2026 -0700
Add `BedrockCreateEvaluationJobOperator` (#66722)
---
providers/amazon/docs/operators/bedrock.rst | 15 +++
.../providers/amazon/aws/operators/bedrock.py | 56 ++++++++
.../amazon/aws/example_bedrock_evaluation.py | 142 +++++++++++++++++++++
.../unit/amazon/aws/operators/test_bedrock.py | 62 +++++++++
4 files changed, 275 insertions(+)
diff --git a/providers/amazon/docs/operators/bedrock.rst
b/providers/amazon/docs/operators/bedrock.rst
index 22c7b23688a..6cfd217b400 100644
--- a/providers/amazon/docs/operators/bedrock.rst
+++ b/providers/amazon/docs/operators/bedrock.rst
@@ -382,6 +382,21 @@ To delete an Amazon Bedrock guardrail, use
:end-before: [END howto_operator_bedrock_delete_guardrail]
+
+.. _howto/operator:BedrockCreateEvaluationJobOperator:
+
+Create an Evaluation Job
+------------------------
+
+To create an Amazon Bedrock model evaluation job, use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateEvaluationJobOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bedrock_create_evaluation_job]
+ :end-before: [END howto_operator_bedrock_create_evaluation_job]
+
Reference
---------
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
index 3c0319cdd8e..d4c6c7fb36e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
@@ -1278,3 +1278,59 @@ class
BedrockUpdateGuardrailOperator(AwsBaseOperator[BedrockHook]):
response = self.hook.conn.update_guardrail(**kwargs)
self.log.info("Updated guardrail %s version %s",
response["guardrailId"], response["version"])
return response["guardrailId"]
+
+
+class BedrockCreateEvaluationJobOperator(AwsBaseOperator[BedrockHook]):
+ """
+ Create an Amazon Bedrock model evaluation job.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BedrockCreateEvaluationJobOperator`
+
+ :param job_name: The name of the evaluation job. (templated)
+ :param role_arn: The IAM role ARN for the evaluation job. (templated)
+ :param evaluation_config: The evaluation configuration dict. (templated)
+ :param inference_config: The inference configuration dict. (templated)
+ :param output_data_config: The output data configuration dict. (templated)
+ :param job_description: Optional description. (templated)
+ """
+
+ aws_hook_class = BedrockHook
+ template_fields: Sequence[str] = aws_template_fields("job_name",
"role_arn", "job_description")
+
+ def __init__(
+ self,
+ *,
+ job_name: str,
+ role_arn: str,
+ evaluation_config: dict[str, Any],
+ inference_config: dict[str, Any],
+ output_data_config: dict[str, Any],
+ job_description: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.job_name = job_name
+ self.role_arn = role_arn
+ self.evaluation_config = evaluation_config
+ self.inference_config = inference_config
+ self.output_data_config = output_data_config
+ self.job_description = job_description
+
+ def execute(self, context: Context) -> str:
+ self.log.info("Creating evaluation job %s", self.job_name)
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "jobName": self.job_name,
+ "roleArn": self.role_arn,
+ "evaluationConfig": self.evaluation_config,
+ "inferenceConfig": self.inference_config,
+ "outputDataConfig": self.output_data_config,
+ "jobDescription": self.job_description,
+ }
+ )
+ response = self.hook.conn.create_evaluation_job(**kwargs)
+ job_arn = response["jobArn"]
+ self.log.info("Created evaluation job %s: %s", self.job_name, job_arn)
+ return job_arn
diff --git
a/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
b/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
new file mode 100644
index 00000000000..d64700cf02a
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_bedrock_evaluation.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.bedrock import
BedrockCreateEvaluationJobOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CreateBucketOperator,
+ S3CreateObjectOperator,
+ S3DeleteBucketOperator,
+)
+from airflow.providers.common.compat.sdk import DAG, chain
+
+from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import TriggerRule, task
+else:
+ from airflow.decorators import task # type: ignore[attr-defined,no-redef]
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+
+DAG_ID = "example_bedrock_evaluation"
+
+ROLE_ARN_KEY = "ROLE_ARN"
+sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+# Minimal JSONL dataset for Summarization task
+EVAL_DATASET = (
+ '{"prompt": "Summarize: The quick brown fox jumps over the lazy dog.",'
+ ' "referenceResponse": "A fox jumps over a dog."}\n'
+)
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule=None,
+ start_date=datetime(2024, 1, 1),
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ role_arn = test_context[ROLE_ARN_KEY]
+ bucket_name = "airflow-system-test-bedrock-eval"
+
+ # TEST SETUP
+ create_bucket = S3CreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=bucket_name,
+ )
+
+ upload_dataset = S3CreateObjectOperator(
+ task_id="upload_dataset",
+ s3_bucket=bucket_name,
+ s3_key="datasets/summarization.jsonl",
+ data=EVAL_DATASET,
+ )
+
+ # TEST BODY
+ # [START howto_operator_bedrock_create_evaluation_job]
+ create_evaluation_job = BedrockCreateEvaluationJobOperator(
+ task_id="create_evaluation_job",
+ job_name=f"{env_id}-eval",
+ role_arn=role_arn,
+ evaluation_config={
+ "automated": {
+ "datasetMetricConfigs": [
+ {
+ "taskType": "Summarization",
+ "dataset": {
+ "name": "eval-dataset",
+ "datasetLocation": {"s3Uri":
f"s3://{bucket_name}/datasets/summarization.jsonl"},
+ },
+ "metricNames": ["Builtin.Accuracy"],
+ }
+ ]
+ }
+ },
+ inference_config={
+ "models": [
+ {
+ "bedrockModel": {
+ "modelIdentifier":
"us.anthropic.claude-haiku-4-5-20251001-v1:0",
+ "inferenceParams": "{}",
+ }
+ }
+ ]
+ },
+ output_data_config={"s3Uri": f"s3://{bucket_name}/output/"},
+ )
+ # [END howto_operator_bedrock_create_evaluation_job]
+
+ # TEST TEARDOWN
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def stop_evaluation_job(job_arn: str):
+ import contextlib
+
+ import boto3
+
+ with contextlib.suppress(Exception):
+ boto3.client("bedrock").stop_evaluation_job(jobIdentifier=job_arn)
+
+ delete_bucket = S3DeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=bucket_name,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ chain(
+ # TEST SETUP
+ test_context,
+ create_bucket,
+ upload_dataset,
+ # TEST BODY
+ create_evaluation_job,
+ # TEST TEARDOWN
+ stop_evaluation_job(create_evaluation_job.output),
+ delete_bucket,
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+test_run = get_test_run(dag)
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
index 7c6b1531fa4..e0b6f8f7cbb 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
@@ -30,6 +30,7 @@ from airflow.providers.amazon.aws.hooks.bedrock import
BedrockAgentHook, Bedrock
from airflow.providers.amazon.aws.operators.bedrock import (
BedrockBatchInferenceOperator,
BedrockCreateDataSourceOperator,
+ BedrockCreateEvaluationJobOperator,
BedrockCreateGuardrailOperator,
BedrockCreateGuardrailVersionOperator,
BedrockCreateKnowledgeBaseOperator,
@@ -998,3 +999,64 @@ class TestBedrockUpdateGuardrailOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+EVAL_JOB_NAME = "test-eval-job"
+EVAL_JOB_ARN =
"arn:aws:bedrock:us-east-1:123456789012:evaluation-job/test-eval-job"
+ROLE_ARN = "arn:aws:iam::123456789012:role/test-role"
+EVAL_CONFIG = {"automated": {"datasetMetricConfigs": [{"taskType":
"Summarization"}]}}
+INFERENCE_CONFIG = {"models": [{"bedrockModel": {"modelIdentifier":
"anthropic.claude-v2"}}]}
+OUTPUT_CONFIG = {"s3Uri": "s3://bucket/output/"}
+
+
+class TestBedrockCreateEvaluationJobOperator:
+ def setup_method(self):
+ self.operator = BedrockCreateEvaluationJobOperator(
+ task_id="create_eval_job",
+ job_name=EVAL_JOB_NAME,
+ role_arn=ROLE_ARN,
+ evaluation_config=EVAL_CONFIG,
+ inference_config=INFERENCE_CONFIG,
+ output_data_config=OUTPUT_CONFIG,
+ )
+
+ @mock.patch.object(BedrockHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.create_evaluation_job.return_value = {"jobArn":
EVAL_JOB_ARN}
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+ mock_client.create_evaluation_job.assert_called_once_with(
+ jobName=EVAL_JOB_NAME,
+ roleArn=ROLE_ARN,
+ evaluationConfig=EVAL_CONFIG,
+ inferenceConfig=INFERENCE_CONFIG,
+ outputDataConfig=OUTPUT_CONFIG,
+ )
+ assert result == EVAL_JOB_ARN
+
+ @mock.patch.object(BedrockHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_description(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.create_evaluation_job.return_value = {"jobArn":
EVAL_JOB_ARN}
+ mock_conn.return_value = mock_client
+
+ op = BedrockCreateEvaluationJobOperator(
+ task_id="create_eval_job",
+ job_name=EVAL_JOB_NAME,
+ role_arn=ROLE_ARN,
+ evaluation_config=EVAL_CONFIG,
+ inference_config=INFERENCE_CONFIG,
+ output_data_config=OUTPUT_CONFIG,
+ job_description="Test evaluation",
+ )
+ result = op.execute({})
+
+ call_kwargs = mock_client.create_evaluation_job.call_args[1]
+ assert call_kwargs["jobDescription"] == "Test evaluation"
+ assert result == EVAL_JOB_ARN
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)