vincbeck commented on code in PR #34807: URL: https://github.com/apache/airflow/pull/34807#discussion_r1355765277
########## airflow/providers/amazon/aws/operators/glue.py: ########## @@ -230,3 +230,99 @@ def on_kill(self): ) if not response["SuccessfulSubmissions"]: self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", self.job_name, self._job_run_id) + + +class GlueDataBrewStartJobOperator(BaseOperator): + """Start an AWS Glue DataBrew job. + + AWS Glue DataBrew is a visual data preparation tool that makes it easier + for data analysts and data scientists to clean and normalize data + to prepare it for analytics and machine learning (ML). + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueDataBrewStartJobOperator` + + :param job_name: unique job name per AWS Account + :param wait_for_completion: Whether to wait for job run completion. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the job to complete. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param delay: Time in seconds to wait between status checks. Default is 30. + """ + + template_fields: Sequence[str] = ( + "job_name", + "wait_for_completion", + "delay", + "deferrable", + ) + + def __init__( + self, + job_name: str, + wait_for_completion: bool = True, + delay: int = 30, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = delay + self.aws_conn_id = aws_conn_id + + @cached_property + def hook(self) -> GlueDataBrewHook: + return GlueDataBrewHook(aws_conn_id=self.aws_conn_id) + + def execute(self, context: Context): + resp = {} + resp["job_name"] = self.job_name + + job = self.data_brew_hook.conn.start_job_run(Name=self.job_name) + run_id = job["RunId"] + resp["run_id"] = run_id + + status = self.data_brew_hook.get_job_state(self.job_name, run_id) + resp["status"] = status + + self.log.info( + "AWS Glue DataBrew Job: %s. Run Id: %s submitted. Status: %s", self.job_name, run_id, status + ) + + if self.deferrable: + self.log.info("Deferring job %s with run_id %s", self.job_name, run_id) + self.defer( + trigger=GlueDataBrewJobCompleteTrigger( + aws_conn_id=self.aws_conn_id, job_name=self.job_name, run_id=run_id, delay=self.delay + ), + method_name="execute_complete", + ) + + elif self.wait_for_completion: + self.log.info( + "Waiting for AWS Glue DataBrew Job: %s. Run Id: %s to complete.", self.job_name, run_id + ) + status = self.data_brew_hook.job_completion( + job_name=self.job_name, delay=self.delay, run_id=run_id + ) + self.log.info("Glue DataBrew Job: %s status: %s", self.job_name, status) + resp["status"] = status + + return resp Review Comment: Up to you to be honest. If you want to do it I'd do it in a separate PR though. On the other end, it is pretty easy to get the status by just using the API, so I dont feel it is really necessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org