hussein-awala commented on code in PR #34807:
URL: https://github.com/apache/airflow/pull/34807#discussion_r1349695817


##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -230,3 +230,99 @@ def on_kill(self):
             )
             if not response["SuccessfulSubmissions"]:
                 self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", 
self.job_name, self._job_run_id)
+
+
+class GlueDataBrewStartJobOperator(BaseOperator):
+    """Start an AWS Glue DataBrew job.
+
+    AWS Glue DataBrew is a visual data preparation tool that makes it easier
+    for data analysts and data scientists to clean and normalize data
+    to prepare it for analytics and machine learning (ML).
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlueDataBrewStartJobOperator`
+
+    :param job_name: unique job name per AWS Account
+    :param wait_for_completion: Whether to wait for job run completion. 
(default: True)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+    :param delay: Time in seconds to wait between status checks. Default is 30.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_name",
+        "wait_for_completion",
+        "delay",
+        "deferrable",
+    )
+
+    def __init__(
+        self,
+        job_name: str,
+        wait_for_completion: bool = True,
+        delay: int = 30,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.job_name = job_name
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.delay = delay
+        self.aws_conn_id = aws_conn_id
+
+    @cached_property
+    def data_brew_hook(self) -> GlueDataBrewHook:

Review Comment:
   Could you use the name `hook` as we do in most of the other operators?
   ```suggestion
       def hook(self) -> GlueDataBrewHook:
   ```



##########
airflow/providers/amazon/aws/operators/glue.py:
##########
@@ -230,3 +230,99 @@ def on_kill(self):
             )
             if not response["SuccessfulSubmissions"]:
                 self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", 
self.job_name, self._job_run_id)
+
+
+class GlueDataBrewStartJobOperator(BaseOperator):
+    """Start an AWS Glue DataBrew job.
+
+    AWS Glue DataBrew is a visual data preparation tool that makes it easier
+    for data analysts and data scientists to clean and normalize data
+    to prepare it for analytics and machine learning (ML).
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GlueDataBrewStartJobOperator`
+
+    :param job_name: unique job name per AWS Account
+    :param wait_for_completion: Whether to wait for job run completion. 
(default: True)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+    :param delay: Time in seconds to wait between status checks. Default is 30.
+    """
+
+    template_fields: Sequence[str] = (
+        "job_name",
+        "wait_for_completion",
+        "delay",
+        "deferrable",
+    )
+
+    def __init__(
+        self,
+        job_name: str,
+        wait_for_completion: bool = True,
+        delay: int = 30,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.job_name = job_name
+        self.wait_for_completion = wait_for_completion
+        self.deferrable = deferrable
+        self.delay = delay
+        self.aws_conn_id = aws_conn_id
+
+    @cached_property
+    def data_brew_hook(self) -> GlueDataBrewHook:
+        return GlueDataBrewHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: Context):
+        resp = {}
+        resp["job_name"] = self.job_name
+
+        job = self.data_brew_hook.conn.start_job_run(Name=self.job_name)
+        run_id = job["RunId"]
+        resp["run_id"] = run_id
+
+        status = self.data_brew_hook.get_job_state(self.job_name, run_id)
+        resp["status"] = status
+
+        self.log.info(
+            "AWS Glue DataBrew Job: %s. Run Id: %s submitted. Status: %s", 
self.job_name, run_id, status
+        )
+
+        if self.deferrable:
+            self.log.info("Deferring job %s with run_id %s", self.job_name, 
run_id)
+            self.defer(
+                trigger=GlueDataBrewJobCompleteTrigger(
+                    aws_conn_id=self.aws_conn_id, job_name=self.job_name, 
run_id=run_id, delay=self.delay
+                ),
+                method_name="execute_complete",
+            )
+
+        elif self.wait_for_completion:

Review Comment:
   What should happen when both `wait_for_completion` and `deferrable` are set 
to `True`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to