This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 8cfb2be989 Add doc and example dag for AWS Step Functions Operators 8cfb2be989 is described below commit 8cfb2be98931e0f0bfb15ca411b36be3d6e66b80 Author: Niko Oliveira <oniko...@amazon.com> AuthorDate: Thu Apr 14 13:20:03 2022 -0700 Add doc and example dag for AWS Step Functions Operators --- .../aws/example_dags/example_step_functions.py | 56 ++++++++++++++++ .../amazon/aws/operators/step_function.py | 10 +-- .../providers/amazon/aws/sensors/step_function.py | 8 ++- airflow/providers/amazon/provider.yaml | 2 + .../operators/step_functions.rst | 78 ++++++++++++++++++++++ 5 files changed, 148 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_step_functions.py b/airflow/providers/amazon/aws/example_dags/example_step_functions.py new file mode 100644 index 0000000000..9a0ac2474c --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_step_functions.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from datetime import datetime +from os import environ + +from airflow import DAG +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.step_function import ( + StepFunctionGetExecutionOutputOperator, + StepFunctionStartExecutionOperator, +) +from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor + +STEP_FUNCTIONS_STATE_MACHINE_ARN = environ.get('STEP_FUNCTIONS_STATE_MACHINE_ARN', 'state_machine_arn') + +with DAG( + dag_id='example_step_functions', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], + catchup=False, +) as dag: + + # [START howto_operator_step_function_start_execution] + start_execution = StepFunctionStartExecutionOperator( + task_id='start_execution', state_machine_arn=STEP_FUNCTIONS_STATE_MACHINE_ARN + ) + # [END howto_operator_step_function_start_execution] + + # [START howto_operator_step_function_execution_sensor] + wait_for_execution = StepFunctionExecutionSensor( + task_id='wait_for_execution', execution_arn=start_execution.output + ) + # [END howto_operator_step_function_execution_sensor] + + # [START howto_operator_step_function_get_execution_output] + get_execution_output = StepFunctionGetExecutionOutputOperator( + task_id='get_execution_output', execution_arn=start_execution.output + ) + # [END howto_operator_step_function_get_execution_output] + + chain(start_execution, wait_for_execution, get_execution_output) diff --git a/airflow/providers/amazon/aws/operators/step_function.py b/airflow/providers/amazon/aws/operators/step_function.py index b800ea90d3..7c32b33890 100644 --- a/airflow/providers/amazon/aws/operators/step_function.py +++ b/airflow/providers/amazon/aws/operators/step_function.py @@ -29,12 +29,13 @@ if TYPE_CHECKING: class StepFunctionStartExecutionOperator(BaseOperator): """ - An Operator that begins execution of an Step Function State Machine + An Operator that begins execution of an AWS Step Function State Machine. Additional arguments may be specified and are passed down to the underlying BaseOperator. .. seealso:: - :class:`~airflow.models.BaseOperator` + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:StepFunctionStartExecutionOperator` :param state_machine_arn: ARN of the Step Function State Machine :param name: The name of the execution. @@ -79,12 +80,13 @@ class StepFunctionStartExecutionOperator(BaseOperator): class StepFunctionGetExecutionOutputOperator(BaseOperator): """ - An Operator that begins execution of an Step Function State Machine + An Operator that returns the output of an AWS Step Function State Machine execution. Additional arguments may be specified and are passed down to the underlying BaseOperator. .. seealso:: - :class:`~airflow.models.BaseOperator` + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:StepFunctionGetExecutionOutputOperator` :param execution_arn: ARN of the Step Function State Machine Execution :param aws_conn_id: aws connection to use, defaults to 'aws_default' diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py index 3c170c0727..085ca21fc4 100644 --- a/airflow/providers/amazon/aws/sensors/step_function.py +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -28,13 +28,17 @@ if TYPE_CHECKING: class StepFunctionExecutionSensor(BaseSensorOperator): """ - Asks for the state of the Step Function State Machine Execution until it + Asks for the state of the AWS Step Function State Machine Execution until it reaches a failure state or success state. - If it fails, failing the task. + If it fails, then fail the task. On successful completion of the Execution the Sensor will do an XCom Push of the State Machine's output to `output` + .. seealso:: + For more information on how to use this sensor, take a look at the guide: + :ref:`howto/operator:StepFunctionExecutionSensor` + :param execution_arn: execution_arn to check the state of :param aws_conn_id: aws connection to use, defaults to 'aws_default' """ diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index c1cfe7abf0..13b84639f5 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -190,6 +190,8 @@ integrations: - integration-name: AWS Step Functions external-doc-url: https://aws.amazon.com/step-functions/ logo: /integration-logos/aws/aws-step-functions_light...@4x.png + how-to-guide: + - /docs/apache-airflow-providers-amazon/operators/step_functions.rst tags: [aws] - integration-name: AWS Database Migration Service external-doc-url: https://aws.amazon.com/dms/ diff --git a/docs/apache-airflow-providers-amazon/operators/step_functions.rst b/docs/apache-airflow-providers-amazon/operators/step_functions.rst new file mode 100644 index 0000000000..8d14af78fc --- /dev/null +++ b/docs/apache-airflow-providers-amazon/operators/step_functions.rst @@ -0,0 +1,78 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +AWS Step Functions Operators +============================ + +`AWS Step Functions <https://aws.amazon.com/step-functions/>`__ makes it easy to coordinate the components +of distributed applications as a series of steps in a visual workflow. You can quickly build and run state +machines to execute the steps of your application in a reliable and scalable fashion. + +Prerequisite Tasks +------------------ + +.. include:: _partials/prerequisite_tasks.rst + +.. _howto/operator:StepFunctionStartExecutionOperator: + +AWS Step Functions Start Execution Operator +""""""""""""""""""""""""""""""""""""""""""" + +To start a new AWS Step Functions State Machine execution +use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_step_function_start_execution] + :end-before: [END howto_operator_step_function_start_execution] + +.. _howto/operator:StepFunctionExecutionSensor: + +AWS Step Functions Execution Sensor +""""""""""""""""""""""""""""""""""" + +To wait on the state of an AWS Step Function State Machine execution until it reaches a terminal state you can +use :class:`~airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_step_function_execution_sensor] + :end-before: [END howto_operator_step_function_execution_sensor] + +.. _howto/operator:StepFunctionGetExecutionOutputOperator: + +AWS Step Functions Get Execution Output Operator +"""""""""""""""""""""""""""""""""""""""""""""""" + +To fetch the output from an AWS Step Function State Machine execution you can +use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_step_function_get_execution_output] + :end-before: [END howto_operator_step_function_get_execution_output] + +References +---------- + +For further information, look at: + +* `Boto3 Library Documentation for Step Functions <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html>`__