This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b951a65c269 Add `OpenSearchServerlessCreateCollectionOperator` (#66549)
b951a65c269 is described below
commit b951a65c2693d07b780cbe35bbcd6f370279a114
Author: John Jackson <[email protected]>
AuthorDate: Tue May 26 13:03:35 2026 -0700
Add `OpenSearchServerlessCreateCollectionOperator` (#66549)
Closes apache#39378
Co-authored-by: John Jackson <[email protected]>
---
.../amazon/docs/operators/opensearchserverless.rst | 18 ++++
providers/amazon/provider.yaml | 3 +
.../amazon/aws/operators/opensearch_serverless.py | 101 +++++++++++++++++++++
.../airflow/providers/amazon/get_provider_info.py | 4 +
.../amazon/aws/example_opensearch_serverless.py | 91 +++++++++++++++++++
.../aws/operators/test_opensearch_serverless.py | 91 +++++++++++++++++++
6 files changed, 308 insertions(+)
diff --git a/providers/amazon/docs/operators/opensearchserverless.rst
b/providers/amazon/docs/operators/opensearchserverless.rst
index e4b392c706c..d4be41da74c 100644
--- a/providers/amazon/docs/operators/opensearchserverless.rst
+++ b/providers/amazon/docs/operators/opensearchserverless.rst
@@ -52,6 +52,24 @@ To wait on the state of an Amazon Bedrock customize model
job until it reaches a
:start-after: [START howto_sensor_opensearch_collection_active]
:end-before: [END howto_sensor_opensearch_collection_active]
+
+Operators
+---------
+
+.. _howto/operator:OpenSearchServerlessCreateCollectionOperator:
+
+Create a Collection
+===================
+
+To create an Amazon OpenSearch Serverless collection, use
+:class:`~airflow.providers.amazon.aws.operators.opensearch_serverless.OpenSearchServerlessCreateCollectionOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_opensearch_serverless.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_opensearch_serverless_create_collection]
+ :end-before: [END howto_operator_opensearch_serverless_create_collection]
+
Reference
---------
diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml
index f396e2184e5..4c991ef1ffe 100644
--- a/providers/amazon/provider.yaml
+++ b/providers/amazon/provider.yaml
@@ -473,6 +473,9 @@ operators:
- integration-name: Amazon MWAA Serverless
python-modules:
- airflow.providers.amazon.aws.operators.mwaa_serverless
+ - integration-name: Amazon OpenSearch Serverless
+ python-modules:
+ - airflow.providers.amazon.aws.operators.opensearch_serverless
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.operators.s3
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
new file mode 100644
index 00000000000..5e5fd158e12
--- /dev/null
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Amazon OpenSearch Serverless operators."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any, Literal
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.opensearch_serverless import
OpenSearchServerlessHook
+from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+from airflow.utils.helpers import prune_dict
+
+if TYPE_CHECKING:
+ from airflow.sdk import Context
+
+
+class
OpenSearchServerlessCreateCollectionOperator(AwsBaseOperator[OpenSearchServerlessHook]):
+ """
+ Create an Amazon OpenSearch Serverless collection.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:OpenSearchServerlessCreateCollectionOperator`
+
+ :param collection_name: The name of the collection. (templated)
+ :param collection_type: The type of collection (SEARCH, TIMESERIES,
VECTORSEARCH). (templated)
+ :param description: Optional description. (templated)
+ :param standby_replicas: Whether to use standby replicas (ENABLED or
DISABLED).
+ :param tags: Optional list of tag dicts.
+ :param if_exists: Behavior when the collection already exists.
+ ``"fail"`` raises an error, ``"skip"`` logs and returns.
+ """
+
+ aws_hook_class = OpenSearchServerlessHook
+ template_fields: Sequence[str] = aws_template_fields("collection_name",
"collection_type", "description")
+
+ def __init__(
+ self,
+ *,
+ collection_name: str,
+ collection_type: str = "SEARCH",
+ description: str | None = None,
+ standby_replicas: str | None = None,
+ tags: list[dict[str, str]] | None = None,
+ if_exists: Literal["fail", "skip"] = "skip",
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.collection_name = collection_name
+ self.collection_type = collection_type
+ self.description = description
+ self.standby_replicas = standby_replicas
+ self.tags = tags
+ self.if_exists = if_exists
+
+ def execute(self, context: Context) -> str:
+ self.log.info("Creating OpenSearch Serverless collection %s",
self.collection_name)
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "name": self.collection_name,
+ "type": self.collection_type,
+ "description": self.description,
+ "standbyReplicas": self.standby_replicas,
+ "tags": self.tags,
+ }
+ )
+ try:
+ response = self.hook.conn.create_collection(**kwargs)
+ collection_id = response["createCollectionDetail"]["id"]
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ConflictException" and
self.if_exists == "skip":
+ self.log.info("Collection %s already exists, skipping.",
self.collection_name)
+ collections =
self.hook.conn.batch_get_collection(names=[self.collection_name])
+ details = collections.get("collectionDetails", [])
+ if not details:
+ raise RuntimeError(
+ f"Collection {self.collection_name} reported as
existing but not found"
+ )
+ collection_id = details[0]["id"]
+ else:
+ raise
+ self.log.info("Collection %s: %s", self.collection_name, collection_id)
+ return collection_id
diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
index 99721f3c481..3a407bb1b2f 100644
--- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
+++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
@@ -465,6 +465,10 @@ def get_provider_info():
"integration-name": "Amazon MWAA Serverless",
"python-modules":
["airflow.providers.amazon.aws.operators.mwaa_serverless"],
},
+ {
+ "integration-name": "Amazon OpenSearch Serverless",
+ "python-modules":
["airflow.providers.amazon.aws.operators.opensearch_serverless"],
+ },
{
"integration-name": "Amazon Simple Storage Service (S3)",
"python-modules":
["airflow.providers.amazon.aws.operators.s3"],
diff --git
a/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py
new file mode 100644
index 00000000000..66e3a79346a
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.opensearch_serverless import (
+ OpenSearchServerlessCreateCollectionOperator,
+)
+from airflow.providers.amazon.aws.sensors.opensearch_serverless import (
+ OpenSearchServerlessCollectionActiveSensor,
+)
+from airflow.providers.common.compat.sdk import DAG, chain
+
+from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import TriggerRule, task
+else:
+ from airflow.decorators import task # type: ignore[attr-defined,no-redef]
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+
+DAG_ID = "example_opensearch_serverless"
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_collection(collection_id: str):
+ import boto3
+
+ boto3.client("opensearchserverless").delete_collection(id=collection_id)
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule=None,
+ start_date=datetime(2024, 1, 1),
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ collection_name = f"{env_id}-collection"
+
+ # [START howto_operator_opensearch_serverless_create_collection]
+ create_collection = OpenSearchServerlessCreateCollectionOperator(
+ task_id="create_collection",
+ collection_name=collection_name,
+ collection_type="SEARCH",
+ )
+ # [END howto_operator_opensearch_serverless_create_collection]
+
+ wait_for_collection = OpenSearchServerlessCollectionActiveSensor(
+ task_id="wait_for_collection",
+ collection_name=collection_name,
+ poke_interval=30,
+ timeout=600,
+ )
+
+ chain(
+ # TEST SETUP
+ test_context,
+ # TEST BODY
+ create_collection,
+ wait_for_collection,
+ # TEST TEARDOWN
+ delete_collection(create_collection.output),
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+test_run = get_test_run(dag)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
new file mode 100644
index 00000000000..c881c91c7ce
--- /dev/null
+++
b/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.opensearch_serverless import
OpenSearchServerlessHook
+from airflow.providers.amazon.aws.operators.opensearch_serverless import (
+ OpenSearchServerlessCreateCollectionOperator,
+)
+
+from unit.amazon.aws.utils.test_template_fields import validate_template_fields
+
+COLLECTION_NAME = "test-collection"
+COLLECTION_ID = "abc123def456"
+
+
+class TestOpenSearchServerlessCreateCollectionOperator:
+ def setup_method(self):
+ self.operator = OpenSearchServerlessCreateCollectionOperator(
+ task_id="create_collection",
+ collection_name=COLLECTION_NAME,
+ collection_type="VECTORSEARCH",
+ )
+
+ @mock.patch.object(OpenSearchServerlessHook, "conn",
new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = MagicMock()
+ mock_client.create_collection.return_value = {
+ "createCollectionDetail": {"id": COLLECTION_ID, "name":
COLLECTION_NAME}
+ }
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+
mock_client.create_collection.assert_called_once_with(name=COLLECTION_NAME,
type="VECTORSEARCH")
+ assert result == COLLECTION_ID
+
+ @mock.patch.object(OpenSearchServerlessHook, "conn",
new_callable=mock.PropertyMock)
+ def test_execute_skip_existing(self, mock_conn):
+ mock_client = MagicMock()
+ mock_client.create_collection.side_effect = ClientError(
+ {"Error": {"Code": "ConflictException", "Message": "exists"}},
"CreateCollection"
+ )
+ mock_client.batch_get_collection.return_value = {
+ "collectionDetails": [{"id": COLLECTION_ID, "name":
COLLECTION_NAME}]
+ }
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+
mock_client.create_collection.assert_called_once_with(name=COLLECTION_NAME,
type="VECTORSEARCH")
+
mock_client.batch_get_collection.assert_called_once_with(names=[COLLECTION_NAME])
+ assert result == COLLECTION_ID
+
+ @mock.patch.object(OpenSearchServerlessHook, "conn",
new_callable=mock.PropertyMock)
+ def test_execute_fail_on_conflict(self, mock_conn):
+ op = OpenSearchServerlessCreateCollectionOperator(
+ task_id="create_collection",
+ collection_name=COLLECTION_NAME,
+ if_exists="fail",
+ )
+ mock_client = MagicMock()
+ mock_client.create_collection.side_effect = ClientError(
+ {"Error": {"Code": "ConflictException", "Message": "exists"}},
"CreateCollection"
+ )
+ mock_conn.return_value = mock_client
+
+ with pytest.raises(ClientError, match="ConflictException"):
+ op.execute({})
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)