ferruzzi commented on code in PR #34807:
URL: https://github.com/apache/airflow/pull/34807#discussion_r1353056130


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -422,3 +422,50 @@ def create_or_update_glue_job(self) -> str | None:
             self.conn.create_job(**config)
 
         return self.job_name
+
+
+class GlueDataBrewHook(AwsBaseHook):
+    """
+    Interact with AWS DataBrew.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs["client_type"] = "databrew"
+        super().__init__(*args, **kwargs)
+
+    def job_completion(self, job_name: str, run_id: str, delay: int = 10, 
max_attempts: int = 60) -> str:
+        """
+        Wait until Glue DataBrew job reaches terminal status.
+
+        :param job_name: The name of the job being processed during this run.
+        :param run_id: The unique identifier of the job run.
+        :param delay: Time in seconds to delay between polls
+        :param maxAttempts: Maximum number of attempts to poll for completion
+        :return: job status
+        """
+        self.get_waiter("job_complete").wait(
+            Name=job_name,
+            RunId=run_id,
+            WaiterConfig={"Delay": delay, "maxAttempts": max_attempts},
+        )
+
+        status = self.get_job_state(job_name, run_id)
+        return status
+
+    def get_job_state(self, job_name: str, run_id: str) -> str:
+        """
+        Get the status of a job run.
+
+        :param job_name: The name of the job being processed during this run.
+        :param run_id: The unique identifier of the job run.
+        :return: State of the job run.
+            
'STARTING'|'RUNNING'|'STOPPING'|'STOPPED'|'SUCCEEDED'|'FAILED'|'TIMEOUT'
+        """
+        response = self.get_conn().describe_job_run(Name=job_name, 
RunId=run_id)

Review Comment:
   (possibly elsewhere as well) Preference is to use the `conn` cached property 
over the old `get_conn()` helper now.
   ```suggestion
           response = self.conn.describe_job_run(Name=job_name, RunId=run_id)
   ```



##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -230,3 +230,98 @@ def on_kill(self):
             )
             if not response["SuccessfulSubmissions"]:
                 self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", 
self.job_name, self._job_run_id)
+
+
+class GlueDataBrewStartJobOperator(BaseOperator):
+    """Start an AWS Glue DataBrew job.

Review Comment:
   ```suggestion
       """
       Start an AWS Glue DataBrew job.
   ```



##########
tests/providers/amazon/aws/waiters/test_custom_waiters.py:
##########
@@ -424,3 +425,42 @@ def test_steps_failed(self, mock_list_steps):
                 StepIds=[self.STEP_ID1, self.STEP_ID2],
                 WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
             )

Review Comment:
   Not needed for the PR, but I think we're getting to the point where custom 
waiters have enough traction that these should get split into 
one-file-per-service like everywhere else...  I don't want you to move the 
existing tests in this PR but I'll leave it up to you if you want to put it in 
here or start the process by putting yours in a new file.  



##########
tests/providers/amazon/aws/waiters/test_custom_waiters.py:
##########
@@ -424,3 +425,42 @@ def test_steps_failed(self, mock_list_steps):
                 StepIds=[self.STEP_ID1, self.STEP_ID2],
                 WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
             )
+
+
+class TestCustomDataBrewWaiters:
+    """Test waiters from ``amazon/aws/waiters/glue.json``."""
+
+    JOB_NAME = "test_job"
+    RUN_ID = "123"
+
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self, monkeypatch):
+        self.client = boto3.client("databrew", region_name="eu-west-3")
+        monkeypatch.setattr(GlueDataBrewHook, "conn", self.client)
+
+    def test_service_waiters(self):
+        hook_waiters = GlueDataBrewHook(aws_conn_id=None).list_waiters()
+        assert "job_complete" in hook_waiters
+
+    @pytest.fixture
+    def mock_describe_job_runs(self):
+        """Mock ``GlueDataBrewHook.Client.describe_job_run`` method."""
+        with mock.patch.object(self.client, "describe_job_run") as m:
+            yield m
+
+    @staticmethod
+    def describe_jobs(status: str):
+        """
+        Helper function for generate minimal DescribeJobRun response for a 
single job.
+        https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJobRun.html

Review Comment:
   I know it's a nitpick, but an upcoming pydocstyle rule enforces a newline 
there so we may as well do it now.
   
   ```suggestion
           Helper function for generate minimal DescribeJobRun response for a 
single job.
           
           
https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJobRun.html
   ```



##########
airflow/providers/amazon/aws/waiters/databrew.json:
##########
@@ -0,0 +1,54 @@
+{
+    "version": 2,
+    "waiters": {
+        "job_complete": {
+            "operation": "DescribeJobRun",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "STOPPED",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "SUCCEEDED",
+                    "state": "success"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "State",
+                    "expected": "FAILED",
+                    "state": "success"

Review Comment:
   This threw me for a second :P 



##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -422,3 +422,55 @@ def create_or_update_glue_job(self) -> str | None:
             self.conn.create_job(**config)
 
         return self.job_name
+
+
+class GlueDataBrewHook(AwsBaseHook):

Review Comment:
   Yeah, it's in an awkward position and I guess either would work, but I'd 
lean towards separate in this case as well.



##########
tests/system/providers/amazon/aws/example_databrew.py:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+
+from airflow.decorators import dag
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.glue import (
+    GlueDataBrewStartJobOperator,
+)
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+DAG_ID = "example_databrew"
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+@dag(DAG_ID, schedule="@once", start_date=pendulum.datetime(2023, 1, 1, 
tz="UTC"), catchup=False)

Review Comment:
   I know this works fine but, for the sake of consistency, could you reformat 
it to match the formatting of all the other AWS system tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to