vincbeck commented on code in PR #34807: URL: https://github.com/apache/airflow/pull/34807#discussion_r1352719011
########## airflow/providers/amazon/aws/hooks/glue.py: ########## @@ -422,3 +422,55 @@ def create_or_update_glue_job(self) -> str | None: self.conn.create_job(**config) return self.job_name + + +class GlueDataBrewHook(AwsBaseHook): + """ + Interact with AWS DataBrew. + + Additional arguments (such as ``aws_conn_id``) may be specified and + are passed down to the underlying AwsBaseHook. + + .. seealso:: + - :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` + """ + + TERMINAL_STATES = ["STOPPED", "SUCCEEDED", "FAILED", "TIMEOUT"] + SUCCESS_STATES = ["SUCCEEDED"] + FAILURE_STATES = ["FAILED", "TIMEOUT"] + RUNNING_STATES = ["STARTING", "RUNNING", "STOPPING"] + + def __init__(self, *args, **kwargs): + kwargs["client_type"] = "databrew" + super().__init__(*args, **kwargs) + + def job_completion(self, job_name: str, run_id: str, delay: int = 10, maxAttempts: int = 60) -> str: Review Comment: Please use underscore: `max_attempts` ########## airflow/providers/amazon/aws/operators/glue.py: ########## @@ -230,3 +230,99 @@ def on_kill(self): ) if not response["SuccessfulSubmissions"]: self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", self.job_name, self._job_run_id) + + +class GlueDataBrewStartJobOperator(BaseOperator): + """Start an AWS Glue DataBrew job. + + AWS Glue DataBrew is a visual data preparation tool that makes it easier + for data analysts and data scientists to clean and normalize data + to prepare it for analytics and machine learning (ML). + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueDataBrewStartJobOperator` + + :param job_name: unique job name per AWS Account + :param wait_for_completion: Whether to wait for job run completion. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the job to complete. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param delay: Time in seconds to wait between status checks. Default is 30. + """ + + template_fields: Sequence[str] = ( + "job_name", + "wait_for_completion", + "delay", + "deferrable", + ) + + def __init__( + self, + job_name: str, + wait_for_completion: bool = True, + delay: int = 30, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = delay + self.aws_conn_id = aws_conn_id + + @cached_property + def hook(self) -> GlueDataBrewHook: + return GlueDataBrewHook(aws_conn_id=self.aws_conn_id) + + def execute(self, context: Context): + resp = {} + resp["job_name"] = self.job_name Review Comment: Why returning the job name? It is part of the input, I dont think it is needed ########## airflow/providers/amazon/aws/operators/glue.py: ########## @@ -230,3 +230,99 @@ def on_kill(self): ) if not response["SuccessfulSubmissions"]: self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", self.job_name, self._job_run_id) + + +class GlueDataBrewStartJobOperator(BaseOperator): + """Start an AWS Glue DataBrew job. + + AWS Glue DataBrew is a visual data preparation tool that makes it easier + for data analysts and data scientists to clean and normalize data + to prepare it for analytics and machine learning (ML). + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueDataBrewStartJobOperator` + + :param job_name: unique job name per AWS Account + :param wait_for_completion: Whether to wait for job run completion. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the job to complete. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param delay: Time in seconds to wait between status checks. Default is 30. + """ + + template_fields: Sequence[str] = ( + "job_name", + "wait_for_completion", + "delay", + "deferrable", + ) + + def __init__( + self, + job_name: str, + wait_for_completion: bool = True, + delay: int = 30, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = delay + self.aws_conn_id = aws_conn_id + + @cached_property + def hook(self) -> GlueDataBrewHook: + return GlueDataBrewHook(aws_conn_id=self.aws_conn_id) + + def execute(self, context: Context): + resp = {} + resp["job_name"] = self.job_name + + job = self.data_brew_hook.conn.start_job_run(Name=self.job_name) + run_id = job["RunId"] + resp["run_id"] = run_id + + status = self.data_brew_hook.get_job_state(self.job_name, run_id) + resp["status"] = status + + self.log.info( + "AWS Glue DataBrew Job: %s. Run Id: %s submitted. Status: %s", self.job_name, run_id, status + ) + + if self.deferrable: + self.log.info("Deferring job %s with run_id %s", self.job_name, run_id) + self.defer( + trigger=GlueDataBrewJobCompleteTrigger( + aws_conn_id=self.aws_conn_id, job_name=self.job_name, run_id=run_id, delay=self.delay + ), + method_name="execute_complete", + ) + + elif self.wait_for_completion: + self.log.info( + "Waiting for AWS Glue DataBrew Job: %s. Run Id: %s to complete.", self.job_name, run_id + ) + status = self.data_brew_hook.job_completion( + job_name=self.job_name, delay=self.delay, run_id=run_id + ) + self.log.info("Glue DataBrew Job: %s status: %s", self.job_name, status) + resp["status"] = status + + return resp Review Comment: One proposal. Instead of being opinionated and return some info related to the job. Why dont we just return the job ID and let the user fetch the information they need ########## airflow/providers/amazon/aws/triggers/glue.py: ########## @@ -148,3 +148,60 @@ async def run(self) -> AsyncIterator[TriggerEvent]: break else: await asyncio.sleep(self.waiter_delay) + + +class GlueDataBrewJobCompleteTrigger(BaseTrigger): + """ + Watches for a Glue DataBrew job, triggers when it finishes. + + :param job_name: Glue DataBrew job name + :param run_id: the ID of the specific run to watch for that job + :param delay: Number of seconds to wait between two checks. Default is 10 seconds. + :param maxAttempts: Maximum number of attempts to wait for the job to complete. Default is 60 attempts. Review Comment: ```suggestion :param max_attempts: Maximum number of attempts to wait for the job to complete. Default is 60 attempts. ``` ########## docs/apache-airflow-providers-amazon/operators/glue.rst: ########## @@ -100,8 +100,18 @@ use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor` :start-after: [START howto_sensor_glue] :end-before: [END howto_sensor_glue] +Start an AWS Glue DataBrew job Review Comment: ```suggestion .. _howto/sensor:GlueDataBrewStartJobOperator: Start an AWS Glue DataBrew job ``` ########## tests/system/providers/amazon/aws/example_databrew.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.decorators import dag +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.glue import ( + GlueDataBrewStartJobOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_databrew_dag" Review Comment: ```suggestion DAG_ID = "example_databrew" ``` ########## airflow/providers/amazon/aws/hooks/glue.py: ########## @@ -422,3 +422,55 @@ def create_or_update_glue_job(self) -> str | None: self.conn.create_job(**config) return self.job_name + + +class GlueDataBrewHook(AwsBaseHook): Review Comment: Yeah I agree. I guess both are fine and it is just a question of opinion but I would move it too to another file named `databrew.py` ########## airflow/providers/amazon/aws/triggers/glue.py: ########## @@ -148,3 +148,60 @@ async def run(self) -> AsyncIterator[TriggerEvent]: break else: await asyncio.sleep(self.waiter_delay) + + +class GlueDataBrewJobCompleteTrigger(BaseTrigger): Review Comment: There is a `AwsBaseWaiterTrigger` in the Amazon provider package which simplifies how to create a Trigger (and reduce duplicated code). Have you tried using it? You can look at `AthenaTrigger` as an example of trigger using it ########## airflow/providers/amazon/aws/triggers/glue.py: ########## @@ -148,3 +148,60 @@ async def run(self) -> AsyncIterator[TriggerEvent]: break else: await asyncio.sleep(self.waiter_delay) + + +class GlueDataBrewJobCompleteTrigger(BaseTrigger): + """ + Watches for a Glue DataBrew job, triggers when it finishes. + + :param job_name: Glue DataBrew job name + :param run_id: the ID of the specific run to watch for that job + :param delay: Number of seconds to wait between two checks. Default is 10 seconds. + :param maxAttempts: Maximum number of attempts to wait for the job to complete. Default is 60 attempts. + :param aws_conn_id: The Airflow connection used for AWS credentials. + """ + + def __init__( + self, + job_name: str, + run_id: str, + aws_conn_id: str, + delay: int = 10, + maxAttempts: int = 60, Review Comment: ```suggestion max_attempts: int = 60, ``` ########## airflow/providers/amazon/aws/operators/glue.py: ########## @@ -230,3 +230,99 @@ def on_kill(self): ) if not response["SuccessfulSubmissions"]: self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s", self.job_name, self._job_run_id) + + +class GlueDataBrewStartJobOperator(BaseOperator): + """Start an AWS Glue DataBrew job. + + AWS Glue DataBrew is a visual data preparation tool that makes it easier + for data analysts and data scientists to clean and normalize data + to prepare it for analytics and machine learning (ML). + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueDataBrewStartJobOperator` + + :param job_name: unique job name per AWS Account + :param wait_for_completion: Whether to wait for job run completion. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the job to complete. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param delay: Time in seconds to wait between status checks. Default is 30. + """ + + template_fields: Sequence[str] = ( + "job_name", + "wait_for_completion", + "delay", + "deferrable", + ) + + def __init__( + self, + job_name: str, + wait_for_completion: bool = True, + delay: int = 30, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = delay + self.aws_conn_id = aws_conn_id + + @cached_property + def hook(self) -> GlueDataBrewHook: + return GlueDataBrewHook(aws_conn_id=self.aws_conn_id) + + def execute(self, context: Context): + resp = {} + resp["job_name"] = self.job_name + + job = self.data_brew_hook.conn.start_job_run(Name=self.job_name) + run_id = job["RunId"] + resp["run_id"] = run_id + + status = self.data_brew_hook.get_job_state(self.job_name, run_id) + resp["status"] = status + + self.log.info( + "AWS Glue DataBrew Job: %s. Run Id: %s submitted. Status: %s", self.job_name, run_id, status + ) + + if self.deferrable: + self.log.info("Deferring job %s with run_id %s", self.job_name, run_id) + self.defer( + trigger=GlueDataBrewJobCompleteTrigger( + aws_conn_id=self.aws_conn_id, job_name=self.job_name, run_id=run_id, delay=self.delay + ), + method_name="execute_complete", + ) + + elif self.wait_for_completion: + self.log.info( + "Waiting for AWS Glue DataBrew Job: %s. Run Id: %s to complete.", self.job_name, run_id + ) + status = self.data_brew_hook.job_completion( + job_name=self.job_name, delay=self.delay, run_id=run_id + ) + self.log.info("Glue DataBrew Job: %s status: %s", self.job_name, status) + resp["status"] = status + + return resp + + def execute_complete(self, context: Context, event=None) -> dict[str, str]: + result = {} + result["job_name"] = event.get("jobName", "") + result["run_id"] = event.get("runId", "") + result["status"] = event.get("status", "") Review Comment: ```suggestion result = { "job_name": event.get("jobName", ""), "run_id": event.get("runId", ""), "status": event.get("status", ""), } ``` ########## airflow/providers/amazon/aws/triggers/glue.py: ########## @@ -148,3 +148,60 @@ async def run(self) -> AsyncIterator[TriggerEvent]: break else: await asyncio.sleep(self.waiter_delay) + + +class GlueDataBrewJobCompleteTrigger(BaseTrigger): + """ + Watches for a Glue DataBrew job, triggers when it finishes. + + :param job_name: Glue DataBrew job name + :param run_id: the ID of the specific run to watch for that job + :param delay: Number of seconds to wait between two checks. Default is 10 seconds. + :param maxAttempts: Maximum number of attempts to wait for the job to complete. Default is 60 attempts. + :param aws_conn_id: The Airflow connection used for AWS credentials. + """ + + def __init__( + self, + job_name: str, + run_id: str, + aws_conn_id: str, + delay: int = 10, + maxAttempts: int = 60, + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.run_id = run_id + self.aws_conn_id = aws_conn_id + self.delay = delay + self.maxAttempts = maxAttempts Review Comment: ```suggestion self.max_attempts = max_attempts ``` ########## tests/system/providers/amazon/aws/example_databrew.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.decorators import dag +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.glue import ( + GlueDataBrewStartJobOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_databrew_dag" + +# Externally fetched variables: +JOB_NAME = "JOB_NAME" + + +sys_test_context_task = SystemTestContextBuilder().add_variable(JOB_NAME).build() + + +@dag(DAG_ID, schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False) Review Comment: ```suggestion @dag(DAG_ID, schedule="@once", start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False) ``` ########## tests/system/providers/amazon/aws/example_databrew.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.decorators import dag +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.glue import ( + GlueDataBrewStartJobOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_databrew_dag" + +# Externally fetched variables: +JOB_NAME = "JOB_NAME" + + +sys_test_context_task = SystemTestContextBuilder().add_variable(JOB_NAME).build() Review Comment: We can generate the job name ourselves ```suggestion sys_test_context_task = SystemTestContextBuilder().build() ``` ########## tests/system/providers/amazon/aws/example_databrew.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.decorators import dag +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.glue import ( + GlueDataBrewStartJobOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_databrew_dag" + +# Externally fetched variables: +JOB_NAME = "JOB_NAME" + + +sys_test_context_task = SystemTestContextBuilder().add_variable(JOB_NAME).build() + + +@dag(DAG_ID, schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False) +def run_job(): + # [START howto_operator_glue_databrew_start] + + GlueDataBrewStartJobOperator(task_id="startjob", deferrable=True, job_name=JOB_NAME, delay=15) + + # [END howto_operator_glue_databrew_start] Review Comment: ```suggestion test_context = sys_test_context_task() env_id = test_context["ENV_ID"] job_name = f"{env_id}-databrew-job" # [START howto_operator_glue_databrew_start] start_job = GlueDataBrewStartJobOperator(task_id="startjob", deferrable=True, job_name=job_name, delay=15) # [END howto_operator_glue_databrew_start] ``` ########## tests/system/providers/amazon/aws/example_databrew.py: ########## @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.decorators import dag +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.glue import ( + GlueDataBrewStartJobOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_databrew_dag" + +# Externally fetched variables: +JOB_NAME = "JOB_NAME" + + +sys_test_context_task = SystemTestContextBuilder().add_variable(JOB_NAME).build() + + +@dag(DAG_ID, schedule=None, start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False) +def run_job(): + # [START howto_operator_glue_databrew_start] + + GlueDataBrewStartJobOperator(task_id="startjob", deferrable=True, job_name=JOB_NAME, delay=15) + + # [END howto_operator_glue_databrew_start] + + +test_context = sys_test_context_task() + + +chain(test_context, run_job) + +from tests.system.utils.watcher import watcher # noqa: E402 + +# This test needs watcher in order to properly mark success/failure +# when "tearDown" task with trigger rule is part of the DAG +list(dag.tasks) >> watcher() Review Comment: ```suggestion chain(test_context, start_job) from tests.system.utils.watcher import watcher # noqa: E402 # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() ``` ########## docs/apache-airflow-providers-amazon/operators/glue.rst: ########## @@ -100,8 +100,18 @@ use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor` :start-after: [START howto_sensor_glue] :end-before: [END howto_sensor_glue] +Start an AWS Glue DataBrew job +============================== + +To submit a new AWS Glue DataBrew job you can use :class:`~airflow.providers.amazon.aws.operators.glue.GlueDataBrewStartJobOperator`. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_databrew.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_glue_databrew_start] + :end-before: [END howto_operator_glue_databrew_start] + Reference --------- -* `AWS boto3 library documentation for Glue <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html>`__ -* `Glue IAM Role creation <https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html>`__ Review Comment: Why removing these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org