This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b951a65c269 Add `OpenSearchServerlessCreateCollectionOperator` (#66549)
b951a65c269 is described below

commit b951a65c2693d07b780cbe35bbcd6f370279a114
Author: John Jackson <[email protected]>
AuthorDate: Tue May 26 13:03:35 2026 -0700

    Add `OpenSearchServerlessCreateCollectionOperator` (#66549)
    
    Closes apache#39378
    
    Co-authored-by: John Jackson <[email protected]>
---
 .../amazon/docs/operators/opensearchserverless.rst |  18 ++++
 providers/amazon/provider.yaml                     |   3 +
 .../amazon/aws/operators/opensearch_serverless.py  | 101 +++++++++++++++++++++
 .../airflow/providers/amazon/get_provider_info.py  |   4 +
 .../amazon/aws/example_opensearch_serverless.py    |  91 +++++++++++++++++++
 .../aws/operators/test_opensearch_serverless.py    |  91 +++++++++++++++++++
 6 files changed, 308 insertions(+)

diff --git a/providers/amazon/docs/operators/opensearchserverless.rst 
b/providers/amazon/docs/operators/opensearchserverless.rst
index e4b392c706c..d4be41da74c 100644
--- a/providers/amazon/docs/operators/opensearchserverless.rst
+++ b/providers/amazon/docs/operators/opensearchserverless.rst
@@ -52,6 +52,24 @@ To wait on the state of an Amazon Bedrock customize model 
job until it reaches a
     :start-after: [START howto_sensor_opensearch_collection_active]
     :end-before: [END howto_sensor_opensearch_collection_active]
 
+
+Operators
+---------
+
+.. _howto/operator:OpenSearchServerlessCreateCollectionOperator:
+
+Create a Collection
+===================
+
+To create an Amazon OpenSearch Serverless collection, use
+:class:`~airflow.providers.amazon.aws.operators.opensearch_serverless.OpenSearchServerlessCreateCollectionOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_opensearch_serverless.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_operator_opensearch_serverless_create_collection]
+    :end-before: [END howto_operator_opensearch_serverless_create_collection]
+
 Reference
 ---------
 
diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml
index f396e2184e5..4c991ef1ffe 100644
--- a/providers/amazon/provider.yaml
+++ b/providers/amazon/provider.yaml
@@ -473,6 +473,9 @@ operators:
   - integration-name: Amazon MWAA Serverless
     python-modules:
       - airflow.providers.amazon.aws.operators.mwaa_serverless
+  - integration-name: Amazon OpenSearch Serverless
+    python-modules:
+      - airflow.providers.amazon.aws.operators.opensearch_serverless
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.operators.s3
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
new file mode 100644
index 00000000000..5e5fd158e12
--- /dev/null
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/opensearch_serverless.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Amazon OpenSearch Serverless operators."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any, Literal
+
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.opensearch_serverless import 
OpenSearchServerlessHook
+from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+from airflow.utils.helpers import prune_dict
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class 
OpenSearchServerlessCreateCollectionOperator(AwsBaseOperator[OpenSearchServerlessHook]):
+    """
+    Create an Amazon OpenSearch Serverless collection.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:OpenSearchServerlessCreateCollectionOperator`
+
+    :param collection_name: The name of the collection. (templated)
+    :param collection_type: The type of collection (SEARCH, TIMESERIES, 
VECTORSEARCH). (templated)
+    :param description: Optional description. (templated)
+    :param standby_replicas: Whether to use standby replicas (ENABLED or 
DISABLED).
+    :param tags: Optional list of tag dicts.
+    :param if_exists: Behavior when the collection already exists.
+        ``"fail"`` raises an error, ``"skip"`` logs and returns.
+    """
+
+    aws_hook_class = OpenSearchServerlessHook
+    template_fields: Sequence[str] = aws_template_fields("collection_name", 
"collection_type", "description")
+
+    def __init__(
+        self,
+        *,
+        collection_name: str,
+        collection_type: str = "SEARCH",
+        description: str | None = None,
+        standby_replicas: str | None = None,
+        tags: list[dict[str, str]] | None = None,
+        if_exists: Literal["fail", "skip"] = "skip",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.collection_name = collection_name
+        self.collection_type = collection_type
+        self.description = description
+        self.standby_replicas = standby_replicas
+        self.tags = tags
+        self.if_exists = if_exists
+
+    def execute(self, context: Context) -> str:
+        self.log.info("Creating OpenSearch Serverless collection %s", 
self.collection_name)
+        kwargs: dict[str, Any] = prune_dict(
+            {
+                "name": self.collection_name,
+                "type": self.collection_type,
+                "description": self.description,
+                "standbyReplicas": self.standby_replicas,
+                "tags": self.tags,
+            }
+        )
+        try:
+            response = self.hook.conn.create_collection(**kwargs)
+            collection_id = response["createCollectionDetail"]["id"]
+        except ClientError as e:
+            if e.response["Error"]["Code"] == "ConflictException" and 
self.if_exists == "skip":
+                self.log.info("Collection %s already exists, skipping.", 
self.collection_name)
+                collections = 
self.hook.conn.batch_get_collection(names=[self.collection_name])
+                details = collections.get("collectionDetails", [])
+                if not details:
+                    raise RuntimeError(
+                        f"Collection {self.collection_name} reported as 
existing but not found"
+                    )
+                collection_id = details[0]["id"]
+            else:
+                raise
+        self.log.info("Collection %s: %s", self.collection_name, collection_id)
+        return collection_id
diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py 
b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
index 99721f3c481..3a407bb1b2f 100644
--- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
+++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py
@@ -465,6 +465,10 @@ def get_provider_info():
                 "integration-name": "Amazon MWAA Serverless",
                 "python-modules": 
["airflow.providers.amazon.aws.operators.mwaa_serverless"],
             },
+            {
+                "integration-name": "Amazon OpenSearch Serverless",
+                "python-modules": 
["airflow.providers.amazon.aws.operators.opensearch_serverless"],
+            },
             {
                 "integration-name": "Amazon Simple Storage Service (S3)",
                 "python-modules": 
["airflow.providers.amazon.aws.operators.s3"],
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py 
b/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py
new file mode 100644
index 00000000000..66e3a79346a
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_opensearch_serverless.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.opensearch_serverless import (
+    OpenSearchServerlessCreateCollectionOperator,
+)
+from airflow.providers.amazon.aws.sensors.opensearch_serverless import (
+    OpenSearchServerlessCollectionActiveSensor,
+)
+from airflow.providers.common.compat.sdk import DAG, chain
+
+from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import TriggerRule, task
+else:
+    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+DAG_ID = "example_opensearch_serverless"
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_collection(collection_id: str):
+    import boto3
+
+    boto3.client("opensearchserverless").delete_collection(id=collection_id)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule=None,
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    collection_name = f"{env_id}-collection"
+
+    # [START howto_operator_opensearch_serverless_create_collection]
+    create_collection = OpenSearchServerlessCreateCollectionOperator(
+        task_id="create_collection",
+        collection_name=collection_name,
+        collection_type="SEARCH",
+    )
+    # [END howto_operator_opensearch_serverless_create_collection]
+
+    wait_for_collection = OpenSearchServerlessCollectionActiveSensor(
+        task_id="wait_for_collection",
+        collection_name=collection_name,
+        poke_interval=30,
+        timeout=600,
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_collection,
+        wait_for_collection,
+        # TEST TEARDOWN
+        delete_collection(create_collection.output),
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+test_run = get_test_run(dag)
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
 
b/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
new file mode 100644
index 00000000000..c881c91c7ce
--- /dev/null
+++ 
b/providers/amazon/tests/unit/amazon/aws/operators/test_opensearch_serverless.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.opensearch_serverless import 
OpenSearchServerlessHook
+from airflow.providers.amazon.aws.operators.opensearch_serverless import (
+    OpenSearchServerlessCreateCollectionOperator,
+)
+
+from unit.amazon.aws.utils.test_template_fields import validate_template_fields
+
+COLLECTION_NAME = "test-collection"
+COLLECTION_ID = "abc123def456"
+
+
+class TestOpenSearchServerlessCreateCollectionOperator:
+    def setup_method(self):
+        self.operator = OpenSearchServerlessCreateCollectionOperator(
+            task_id="create_collection",
+            collection_name=COLLECTION_NAME,
+            collection_type="VECTORSEARCH",
+        )
+
+    @mock.patch.object(OpenSearchServerlessHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = MagicMock()
+        mock_client.create_collection.return_value = {
+            "createCollectionDetail": {"id": COLLECTION_ID, "name": 
COLLECTION_NAME}
+        }
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        
mock_client.create_collection.assert_called_once_with(name=COLLECTION_NAME, 
type="VECTORSEARCH")
+        assert result == COLLECTION_ID
+
+    @mock.patch.object(OpenSearchServerlessHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_execute_skip_existing(self, mock_conn):
+        mock_client = MagicMock()
+        mock_client.create_collection.side_effect = ClientError(
+            {"Error": {"Code": "ConflictException", "Message": "exists"}}, 
"CreateCollection"
+        )
+        mock_client.batch_get_collection.return_value = {
+            "collectionDetails": [{"id": COLLECTION_ID, "name": 
COLLECTION_NAME}]
+        }
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        
mock_client.create_collection.assert_called_once_with(name=COLLECTION_NAME, 
type="VECTORSEARCH")
+        
mock_client.batch_get_collection.assert_called_once_with(names=[COLLECTION_NAME])
+        assert result == COLLECTION_ID
+
+    @mock.patch.object(OpenSearchServerlessHook, "conn", 
new_callable=mock.PropertyMock)
+    def test_execute_fail_on_conflict(self, mock_conn):
+        op = OpenSearchServerlessCreateCollectionOperator(
+            task_id="create_collection",
+            collection_name=COLLECTION_NAME,
+            if_exists="fail",
+        )
+        mock_client = MagicMock()
+        mock_client.create_collection.side_effect = ClientError(
+            {"Error": {"Code": "ConflictException", "Message": "exists"}}, 
"CreateCollection"
+        )
+        mock_conn.return_value = mock_client
+
+        with pytest.raises(ClientError, match="ConflictException"):
+            op.execute({})
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)

Reply via email to