This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a37777567 Glue `DataBrew` operator (#34807)
4a37777567 is described below

commit 4a377775672b7148e8935e20844e7a0ba491bdd8
Author: ellisms <114107920+elli...@users.noreply.github.com>
AuthorDate: Mon Oct 16 16:29:05 2023 -0400

    Glue `DataBrew` operator (#34807)
---
 .../providers/amazon/aws/hooks/glue_databrew.py    |  68 +++++++++++++
 .../amazon/aws/operators/glue_databrew.py          | 110 +++++++++++++++++++++
 .../providers/amazon/aws/triggers/glue_databrew.py |  59 +++++++++++
 airflow/providers/amazon/aws/waiters/databrew.json |  36 +++++++
 airflow/providers/amazon/provider.yaml             |  15 +++
 .../operators/glue.rst                             |   1 +
 .../operators/glue_databrew.rst                    |  53 ++++++++++
 .../integration-logos/aws/AWS-Glue-DataBrew_64.png | Bin 0 -> 14575 bytes
 .../amazon/aws/hooks/test_glue_databrew.py         |  38 +++++++
 .../amazon/aws/operators/test_glue_databrew.py     |  58 +++++++++++
 .../amazon/aws/triggers/test_glue_databrew.py      |  46 +++++++++
 .../amazon/aws/waiters/test_glue_databrew.py       |  68 +++++++++++++
 .../providers/amazon/aws/example_glue_databrew.py  |  55 +++++++++++
 13 files changed, 607 insertions(+)

diff --git a/airflow/providers/amazon/aws/hooks/glue_databrew.py 
b/airflow/providers/amazon/aws/hooks/glue_databrew.py
new file mode 100644
index 0000000000..f6c7b3ebd6
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/glue_databrew.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class GlueDataBrewHook(AwsBaseHook):
+    """
+    Interact with AWS DataBrew.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs["client_type"] = "databrew"
+        super().__init__(*args, **kwargs)
+
+    def job_completion(self, job_name: str, run_id: str, delay: int = 10, 
max_attempts: int = 60) -> str:
+        """
+        Wait until Glue DataBrew job reaches terminal status.
+
+        :param job_name: The name of the job being processed during this run.
+        :param run_id: The unique identifier of the job run.
+        :param delay: Time in seconds to delay between polls
+        :param maxAttempts: Maximum number of attempts to poll for completion
+        :return: job status
+        """
+        self.get_waiter("job_complete").wait(
+            Name=job_name,
+            RunId=run_id,
+            WaiterConfig={"Delay": delay, "maxAttempts": max_attempts},
+        )
+
+        status = self.get_job_state(job_name, run_id)
+        return status
+
+    def get_job_state(self, job_name: str, run_id: str) -> str:
+        """
+        Get the status of a job run.
+
+        :param job_name: The name of the job being processed during this run.
+        :param run_id: The unique identifier of the job run.
+        :return: State of the job run.
+            
'STARTING'|'RUNNING'|'STOPPING'|'STOPPED'|'SUCCEEDED'|'FAILED'|'TIMEOUT'
+        """
+        response = self.conn.describe_job_run(Name=job_name, RunId=run_id)
+        return response["State"]
diff --git a/airflow/providers/amazon/aws/operators/glue_databrew.py 
b/airflow/providers/amazon/aws/operators/glue_databrew.py
new file mode 100644
index 0000000000..596a507397
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/glue_databrew.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from airflow.configuration import conf
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.triggers.glue_databrew import 
GlueDataBrewJobCompleteTrigger
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GlueDataBrewStartJobOperator(BaseOperator):
+    """
+    Start an AWS Glue DataBrew job.
+
+    AWS Glue DataBrew is a visual data preparation tool that makes it easier
+    for data analysts and data scientists to clean and normalize data
+    to prepare it for analytics and machine learning (ML).
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlueDataBrewStartJobOperator`
+
+    :param job_name: unique job name per AWS Account
+    :param wait_for_completion: Whether to wait for job run completion. 
(default: True)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+    :param delay: Time in seconds to wait between status checks. Default is 30.
+    :return: dictionary with key run_id and value of the resulting job's 
run_id.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_name",
+        "wait_for_completion",
+        "delay",
+        "deferrable",
+    )
+
+    def __init__(
+        self,
+        job_name: str,
+        wait_for_completion: bool = True,
+        delay: int = 30,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.job_name = job_name
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.delay = delay
+        self.aws_conn_id = aws_conn_id
+
+    @cached_property
+    def hook(self) -> GlueDataBrewHook:
+        return GlueDataBrewHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: Context):
+        job = self.hook.conn.start_job_run(Name=self.job_name)
+        run_id = job["RunId"]
+
+        self.log.info("AWS Glue DataBrew Job: %s. Run Id: %s submitted.", 
self.job_name, run_id)
+
+        if self.deferrable:
+            self.log.info("Deferring job %s with run_id %s", self.job_name, 
run_id)
+            self.defer(
+                trigger=GlueDataBrewJobCompleteTrigger(
+                    aws_conn_id=self.aws_conn_id, job_name=self.job_name, 
run_id=run_id, delay=self.delay
+                ),
+                method_name="execute_complete",
+            )
+
+        elif self.wait_for_completion:
+            self.log.info(
+                "Waiting for AWS Glue DataBrew Job: %s. Run Id: %s to 
complete.", self.job_name, run_id
+            )
+            status = self.hook.job_completion(job_name=self.job_name, 
delay=self.delay, run_id=run_id)
+            self.log.info("Glue DataBrew Job: %s status: %s", self.job_name, 
status)
+
+        return {"run_id": run_id}
+
+    def execute_complete(self, context: Context, event=None) -> dict[str, str]:
+        run_id = event.get("run_id", "")
+        status = event.get("status", "")
+
+        self.log.info("AWS Glue DataBrew runID: %s completed with status: %s", 
run_id, status)
+
+        return {"run_id": run_id}
diff --git a/airflow/providers/amazon/aws/triggers/glue_databrew.py 
b/airflow/providers/amazon/aws/triggers/glue_databrew.py
new file mode 100644
index 0000000000..595b653e2f
--- /dev/null
+++ b/airflow/providers/amazon/aws/triggers/glue_databrew.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+
+
+class GlueDataBrewJobCompleteTrigger(AwsBaseWaiterTrigger):
+    """
+    Watches for a Glue DataBrew job, triggers when it finishes.
+
+    :param job_name: Glue DataBrew job name
+    :param run_id: the ID of the specific run to watch for that job
+    :param delay: Number of seconds to wait between two checks. Default is 10 
seconds.
+    :param max_attempts: Maximum number of attempts to wait for the job to 
complete. Default is 60 attempts.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(
+        self,
+        job_name: str,
+        run_id: str,
+        aws_conn_id: str,
+        delay: int = 10,
+        max_attempts: int = 60,
+        **kwargs,
+    ):
+        super().__init__(
+            serialized_fields={"job_name": job_name, "run_id": run_id},
+            waiter_name="job_complete",
+            waiter_args={"Name": job_name, "RunId": run_id},
+            failure_message=f"Error while waiting for job {job_name} with run 
id {run_id} to complete",
+            status_message=f"Run id: {run_id}",
+            status_queries=["State"],
+            return_value=run_id,
+            return_key="run_id",
+            waiter_delay=delay,
+            waiter_max_attempts=max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> GlueDataBrewHook:
+        return GlueDataBrewHook(aws_conn_id=self.aws_conn_id)
diff --git a/airflow/providers/amazon/aws/waiters/databrew.json 
b/airflow/providers/amazon/aws/waiters/databrew.json
new file mode 100644
index 0000000000..41372def5b
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/databrew.json
@@ -0,0 +1,36 @@
+{
+    "version": 2,
+    "waiters": {
+        "job_complete": {
+            "operation": "DescribeJobRun",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "STOPPED",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "SUCCEEDED",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "FAILED",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "TIMEOUT",
+                    "state": "success"
+                }
+            ]
+        }
+    }
+}
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 74ada23355..0949232bd2 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -289,6 +289,12 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-amazon/operators/appflow.rst
     tags: [aws]
+  - integration-name: AWS Glue DataBrew
+    external-doc-url: 
https://docs.aws.amazon.com/databrew/latest/dg/what-is.html
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
+    logo: /integration-logos/aws/AWS-Glue-DataBrew_64.png
+    tags: [aws]
 
 operators:
   - integration-name: Amazon Athena
@@ -365,6 +371,9 @@ operators:
   - integration-name: Amazon Appflow
     python-modules:
       - airflow.providers.amazon.aws.operators.appflow
+  - integration-name: AWS Glue DataBrew
+    python-modules:
+      - airflow.providers.amazon.aws.operators.glue_databrew
 
 sensors:
   - integration-name: Amazon Athena
@@ -541,6 +550,9 @@ hooks:
   - integration-name: Amazon Appflow
     python-modules:
       - airflow.providers.amazon.aws.hooks.appflow
+  - integration-name: AWS Glue DataBrew
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.glue_databrew
 
 triggers:
   - integration-name: Amazon Web Services
@@ -589,6 +601,9 @@ triggers:
   - integration-name: Amazon Simple Queue Service (SQS)
     python-modules:
       - airflow.providers.amazon.aws.triggers.sqs
+  - integration-name: AWS Glue DataBrew
+    python-modules:
+      - airflow.providers.amazon.aws.triggers.glue_databrew
 
 transfers:
   - source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst 
b/docs/apache-airflow-providers-amazon/operators/glue.rst
index e582e9d415..ddd21205c1 100644
--- a/docs/apache-airflow-providers-amazon/operators/glue.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glue.rst
@@ -105,3 +105,4 @@ Reference
 
 * `AWS boto3 library documentation for Glue 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html>`__
 * `Glue IAM Role creation 
<https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html>`__
+* `AWS boto3 library documentation for Glue DataBrew 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/databrew.html>`__
diff --git a/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst 
b/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
new file mode 100644
index 0000000000..d14f898486
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
@@ -0,0 +1,53 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+=================
+AWS Glue DataBrew
+=================
+
+`AWS Glue DataBrew <https://aws.amazon.com/glue/features/databrew/>`__ is a 
visual data preparation tool
+that makes it easier for data analysts and data scientists to clean and 
normalize data to prepare it
+for analytics and machine learning (ML). You can choose from over 250 prebuilt 
transformations to automate
+data preparation tasks, all without the need to write any code. You can 
automate filtering anomalies, converting
+data to standard formats and correcting invalid values, and other tasks.
+After your data is ready, you can immediately use it for analytics and ML 
projects.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+.. _howto/operator:GlueDataBrewStartJobOperator:
+
+Start an AWS Glue DataBrew job
+==============================
+
+To submit a new AWS Glue DataBrew job you can use 
:class:`~airflow.providers.amazon.aws.operators.glue_databrew.GlueDataBrewStartJobOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_glue_databrew.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_glue_databrew_start]
+    :end-before: [END howto_operator_glue_databrew_start]
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Glue DataBrew 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/databrew.html>`__
diff --git a/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png 
b/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png
new file mode 100644
index 0000000000..9821fcdf26
Binary files /dev/null and 
b/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png differ
diff --git a/tests/providers/amazon/aws/hooks/test_glue_databrew.py 
b/tests/providers/amazon/aws/hooks/test_glue_databrew.py
new file mode 100644
index 0000000000..e3ea047f90
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_glue_databrew.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+
+if TYPE_CHECKING:
+    from unittest.mock import MagicMock
+
+
+class TestGlueDataBrewHook:
+    job_name = "test-databrew-job"
+    runId = "test12345"
+
+    @mock.patch.object(GlueDataBrewHook, "get_job_state")
+    def test_get_job_state(self, get_job_state_mock: MagicMock):
+        get_job_state_mock.return_value = "SUCCEEDED"
+        hook = GlueDataBrewHook()
+        result = hook.get_job_state(self.job_name, self.runId)
+        assert result == "SUCCEEDED"
diff --git a/tests/providers/amazon/aws/operators/test_glue_databrew.py 
b/tests/providers/amazon/aws/operators/test_glue_databrew.py
new file mode 100644
index 0000000000..571e4816b5
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_glue_databrew.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+from moto import mock_databrew
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.operators.glue_databrew import 
GlueDataBrewStartJobOperator
+
+JOB_NAME = "test_job"
+
+
+@pytest.fixture
+def hook() -> GlueDataBrewHook:
+    with mock_databrew():
+        yield GlueDataBrewHook(aws_conn_id="aws_default")
+
+
+class TestGlueDataBrewOperator:
+    @mock.patch.object(GlueDataBrewHook, "conn")
+    @mock.patch.object(GlueDataBrewHook, "get_waiter")
+    def test_start_job_wait_for_completion(self, mock_hook_get_waiter, 
mock_conn):
+        TEST_RUN_ID = "12345"
+        operator = GlueDataBrewStartJobOperator(
+            task_id="task_test", job_name=JOB_NAME, wait_for_completion=True, 
aws_conn_id="aws_default"
+        )
+        mock_conn.start_job_run(mock.MagicMock(), return_value=TEST_RUN_ID)
+        operator.execute(None)
+        mock_hook_get_waiter.assert_called_once_with("job_complete")
+
+    @mock.patch.object(GlueDataBrewHook, "conn")
+    @mock.patch.object(GlueDataBrewHook, "get_waiter")
+    def test_start_job_no_wait(self, mock_hook_get_waiter, mock_conn):
+        TEST_RUN_ID = "12345"
+        operator = GlueDataBrewStartJobOperator(
+            task_id="task_test", job_name=JOB_NAME, wait_for_completion=False, 
aws_conn_id="aws_default"
+        )
+        mock_conn.start_job_run(mock.MagicMock(), return_value=TEST_RUN_ID)
+        operator.execute(None)
+        mock_hook_get_waiter.assert_not_called()
diff --git a/tests/providers/amazon/aws/triggers/test_glue_databrew.py 
b/tests/providers/amazon/aws/triggers/test_glue_databrew.py
new file mode 100644
index 0000000000..7352fffcd8
--- /dev/null
+++ b/tests/providers/amazon/aws/triggers/test_glue_databrew.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.triggers.glue_databrew import 
GlueDataBrewJobCompleteTrigger
+
+TEST_JOB_NAME = "test_job_name"
+TEST_JOB_RUN_ID = "a1234"
+TEST_JOB_RUN_STATUS = "SUCCEEDED"
+
+
+@pytest.fixture
+def trigger():
+    yield GlueDataBrewJobCompleteTrigger(
+        aws_conn_id="aws_default", job_name=TEST_JOB_NAME, 
run_id=TEST_JOB_RUN_ID
+    )
+
+
+class TestGlueDataBrewJobCompleteTrigger:
+    def test_serialize(self, trigger):
+        class_path, args = trigger.serialize()
+
+        class_name = class_path.split(".")[-1]
+        clazz = globals()[class_name]
+        instance = clazz(**args)
+
+        class_path2, args2 = instance.serialize()
+
+        assert class_path == class_path2
+        assert args == args2
diff --git a/tests/providers/amazon/aws/waiters/test_glue_databrew.py 
b/tests/providers/amazon/aws/waiters/test_glue_databrew.py
new file mode 100644
index 0000000000..2393898102
--- /dev/null
+++ b/tests/providers/amazon/aws/waiters/test_glue_databrew.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import pytest
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+
+RUNNING_STATES = ["STARTING", "RUNNING", "STOPPING"]
+TERMINAL_STATES = ["STOPPED", "SUCCEEDED", "FAILED"]
+
+
+class TestCustomDataBrewWaiters:
+    """Test waiters from ``amazon/aws/waiters/glue.json``."""
+
+    JOB_NAME = "test_job"
+    RUN_ID = "123"
+
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self, monkeypatch):
+        self.client = boto3.client("databrew", region_name="eu-west-3")
+        monkeypatch.setattr(GlueDataBrewHook, "conn", self.client)
+
+    def test_service_waiters(self):
+        hook_waiters = GlueDataBrewHook(aws_conn_id=None).list_waiters()
+        assert "job_complete" in hook_waiters
+
+    @pytest.fixture
+    def mock_describe_job_runs(self):
+        """Mock ``GlueDataBrewHook.Client.describe_job_run`` method."""
+        with mock.patch.object(self.client, "describe_job_run") as m:
+            yield m
+
+    @staticmethod
+    def describe_jobs(status: str):
+        """
+        Helper function for generate minimal DescribeJobRun response for a 
single job.
+
+        https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJobRun.html
+        """
+        return {"State": status}
+
+    def test_job_succeeded(self, mock_describe_job_runs):
+        """Test job succeeded"""
+        mock_describe_job_runs.side_effect = [
+            self.describe_jobs(RUNNING_STATES[1]),
+            self.describe_jobs(TERMINAL_STATES[1]),
+        ]
+        waiter = GlueDataBrewHook(aws_conn_id=None).get_waiter("job_complete")
+        waiter.wait(name=self.JOB_NAME, runId=self.RUN_ID, 
WaiterConfig={"Delay": 0.2, "MaxAttempts": 2})
diff --git a/tests/system/providers/amazon/aws/example_glue_databrew.py 
b/tests/system/providers/amazon/aws/example_glue_databrew.py
new file mode 100644
index 0000000000..08625b5611
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_glue_databrew.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+
+from airflow.models.baseoperator import chain
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.glue_databrew import (
+    GlueDataBrewStartJobOperator,
+)
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+DAG_ID = "example_databrew"
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+with DAG(DAG_ID, schedule="@once", start_date=pendulum.datetime(2023, 1, 1, 
tz="UTC"), catchup=False) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context["ENV_ID"]
+
+    job_name = f"{env_id}-databrew-job"
+
+    # [START howto_operator_glue_databrew_start]
+    start_job = GlueDataBrewStartJobOperator(task_id="startjob", 
deferrable=True, job_name=job_name, delay=15)
+    # [END howto_operator_glue_databrew_start]
+
+    chain(test_context, start_job)
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to