This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e477f4ba6c Amazon appflow (#24057)
e477f4ba6c is described below
commit e477f4ba6cd15fabbfe5210c99947bcb70ddac4f
Author: Igor Tavares <[email protected]>
AuthorDate: Tue Jun 21 20:14:46 2022 -0300
Amazon appflow (#24057)
* Add Amazon AppFlow hook.
* Add Amazon AppFlow operators.
* Add Amazon AppFlow examples.
* Add Amazon Appflow docs.
* Apply comments/docs patterns.
* Removing the "private" attribute signal and more.
* Fix task_ids for example_appflow.
* Move datetime_to_epoch() to utils and more.
* Fix the AppflowBaseOperator name.
* Ignore AppflowBaseOperator during structure check.
* test_short_circuit refactor.
* Add get_airflow_version.
* Update airflow/providers/amazon/aws/hooks/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Addressing Josh's requests.
* Add cached_property to AppflowHook
* Update airflow/providers/amazon/aws/hooks/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update airflow/providers/amazon/aws/operators/appflow.py
Co-authored-by: Josh Fell <[email protected]>
* Update Josh's comment.
* Update cached_property import.
* Fix mypy.
Co-authored-by: Josh Fell <[email protected]>
---
.../amazon/aws/example_dags/example_appflow.py | 110 +++++
airflow/providers/amazon/aws/hooks/appflow.py | 145 ++++++
airflow/providers/amazon/aws/operators/appflow.py | 485 +++++++++++++++++++++
.../amazon/aws/operators/redshift_data.py | 1 +
.../amazon/aws/secrets/secrets_manager.py | 4 +-
.../amazon/aws/secrets/systems_manager.py | 4 +-
airflow/providers/amazon/aws/utils/__init__.py | 26 ++
airflow/providers/amazon/provider.yaml | 12 +
docs/apache-airflow-providers-amazon/index.rst | 1 +
.../operators/appflow.rst | 146 +++++++
.../integration-logos/aws/Amazon_AppFlow_light.png | Bin 0 -> 59266 bytes
docs/spelling_wordlist.txt | 1 +
setup.py | 1 +
tests/always/test_project_structure.py | 1 +
tests/providers/amazon/aws/hooks/test_appflow.py | 90 ++++
.../providers/amazon/aws/operators/test_appflow.py | 171 ++++++++
.../providers/amazon/aws/utils/test_utils.py | 28 ++
17 files changed, 1222 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_appflow.py
b/airflow/providers/amazon/aws/example_dags/example_appflow.py
new file mode 100644
index 0000000000..06903c73d9
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_appflow.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.operators.bash import BashOperator
+from airflow.providers.amazon.aws.operators.appflow import (
+ AppflowRecordsShortCircuitOperator,
+ AppflowRunAfterOperator,
+ AppflowRunBeforeOperator,
+ AppflowRunDailyOperator,
+ AppflowRunFullOperator,
+ AppflowRunOperator,
+)
+
+SOURCE_NAME = "salesforce"
+FLOW_NAME = "salesforce-campaign"
+
+with DAG(
+ "example_appflow",
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ catchup=False,
+ tags=["example"],
+) as dag:
+
+ # [START howto_operator_appflow_run]
+ campaign_dump = AppflowRunOperator(
+ task_id="campaign_dump",
+ source=SOURCE_NAME,
+ flow_name=FLOW_NAME,
+ )
+ # [END howto_operator_appflow_run]
+
+ # [START howto_operator_appflow_run_full]
+ campaign_dump_full = AppflowRunFullOperator(
+ task_id="campaign_dump_full",
+ source=SOURCE_NAME,
+ flow_name=FLOW_NAME,
+ )
+ # [END howto_operator_appflow_run_full]
+
+ # [START howto_operator_appflow_run_daily]
+ campaign_dump_daily = AppflowRunDailyOperator(
+ task_id="campaign_dump_daily",
+ source=SOURCE_NAME,
+ flow_name=FLOW_NAME,
+ source_field="LastModifiedDate",
+ filter_date="{{ ds }}",
+ )
+ # [END howto_operator_appflow_run_daily]
+
+ # [START howto_operator_appflow_run_before]
+ campaign_dump_before = AppflowRunBeforeOperator(
+ task_id="campaign_dump_before",
+ source=SOURCE_NAME,
+ flow_name=FLOW_NAME,
+ source_field="LastModifiedDate",
+ filter_date="{{ ds }}",
+ )
+ # [END howto_operator_appflow_run_before]
+
+ # [START howto_operator_appflow_run_after]
+ campaign_dump_after = AppflowRunAfterOperator(
+ task_id="campaign_dump_after",
+ source=SOURCE_NAME,
+ flow_name=FLOW_NAME,
+ source_field="LastModifiedDate",
+ filter_date="3000-01-01", # Future date, so no records to dump
+ )
+ # [END howto_operator_appflow_run_after]
+
+ # [START howto_operator_appflow_shortcircuit]
+ campaign_dump_short_circuit = AppflowRecordsShortCircuitOperator(
+ task_id="campaign_dump_short_circuit",
+ flow_name=FLOW_NAME,
+ appflow_run_task_id="campaign_dump_after", # Should shortcircuit, no
records expected
+ )
+ # [END howto_operator_appflow_shortcircuit]
+
+ should_be_skipped = BashOperator(
+ task_id="should_be_skipped",
+ bash_command="echo 1",
+ )
+
+ chain(
+ campaign_dump,
+ campaign_dump_full,
+ campaign_dump_daily,
+ campaign_dump_before,
+ campaign_dump_after,
+ campaign_dump_short_circuit,
+ should_be_skipped,
+ )
diff --git a/airflow/providers/amazon/aws/hooks/appflow.py
b/airflow/providers/amazon/aws/hooks/appflow.py
new file mode 100644
index 0000000000..e2b628bf3d
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/appflow.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from datetime import datetime, timezone
+from time import sleep
+from typing import TYPE_CHECKING, List
+
+from airflow.compat.functools import cached_property
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+if TYPE_CHECKING:
+ from mypy_boto3_appflow.client import AppflowClient
+ from mypy_boto3_appflow.type_defs import TaskTypeDef
+
+
+class AppflowHook(AwsBaseHook):
+ """
+ Interact with Amazon Appflow, using the boto3 library.
+ Hook attribute ``conn`` has all methods that listed in documentation.
+
+ .. seealso::
+ -
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html
+ - https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html
+
+ Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be
specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+ """
+
+ EVENTUAL_CONSISTENCY_OFFSET: int = 15 # seconds
+ EVENTUAL_CONSISTENCY_POLLING: int = 10 # seconds
+
+ def __init__(self, *args, **kwargs) -> None:
+ kwargs["client_type"] = "appflow"
+ super().__init__(*args, **kwargs)
+
+ @cached_property
+ def conn(self) -> 'AppflowClient':
+ """Get the underlying boto3 Appflow client (cached)"""
+ return super().conn
+
+ def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+ """
+ Execute an AppFlow run.
+
+ :param flow_name: The flow name
+ :param poll_interval: Time (seconds) to wait between two consecutive
calls to check the run status
+ :return: The run execution ID
+ """
+ ts_before: datetime = datetime.now(timezone.utc)
+ sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
+ response_start = self.conn.start_flow(flowName=flow_name)
+ execution_id = response_start["executionId"]
+ self.log.info("executionId: %s", execution_id)
+
+ response_desc = self.conn.describe_flow(flowName=flow_name)
+ last_exec_details = response_desc["lastRunExecutionDetails"]
+
+ # Wait Appflow eventual consistence
+ self.log.info("Waiting for Appflow eventual consistence...")
+ while (
+ response_desc.get("lastRunExecutionDetails", {}).get(
+ "mostRecentExecutionTime", datetime(1970, 1, 1,
tzinfo=timezone.utc)
+ )
+ < ts_before
+ ):
+ sleep(self.EVENTUAL_CONSISTENCY_POLLING)
+ response_desc = self.conn.describe_flow(flowName=flow_name)
+ last_exec_details = response_desc["lastRunExecutionDetails"]
+
+ # Wait flow stops
+ self.log.info("Waiting for flow run...")
+ while (
+ "mostRecentExecutionStatus" not in last_exec_details
+ or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
+ ):
+ sleep(poll_interval)
+ response_desc = self.conn.describe_flow(flowName=flow_name)
+ last_exec_details = response_desc["lastRunExecutionDetails"]
+
+ self.log.info("lastRunExecutionDetails: %s", last_exec_details)
+
+ if last_exec_details["mostRecentExecutionStatus"] == "Error":
+ raise Exception(f"Flow error:\n{json.dumps(response_desc,
default=str)}")
+
+ return execution_id
+
+ def update_flow_filter(
+ self, flow_name: str, filter_tasks: List["TaskTypeDef"],
set_trigger_ondemand: bool = False
+ ) -> None:
+ """
+ Update the flow task filter.
+ All filters will be removed if an empty array is passed to
filter_tasks.
+
+ :param flow_name: The flow name
+ :param filter_tasks: List flow tasks to be added
+ :param set_trigger_ondemand: If True, set the trigger to on-demand;
otherwise, keep the trigger as is
+ :return: None
+ """
+ response = self.conn.describe_flow(flowName=flow_name)
+ connector_type = response["sourceFlowConfig"]["connectorType"]
+ tasks: List["TaskTypeDef"] = []
+
+ # cleanup old filter tasks
+ for task in response["tasks"]:
+ if (
+ task["taskType"] == "Filter"
+ and task.get("connectorOperator", {}).get(connector_type) !=
"PROJECTION"
+ ):
+ self.log.info("Removing task: %s", task)
+ else:
+ tasks.append(task) # List of non-filter tasks
+
+ tasks += filter_tasks # Add the new filter tasks
+
+ if set_trigger_ondemand:
+ # Clean up attribute to force on-demand trigger
+ del response["triggerConfig"]["triggerProperties"]
+
+ self.conn.update_flow(
+ flowName=response["flowName"],
+ destinationFlowConfigList=response["destinationFlowConfigList"],
+ sourceFlowConfig=response["sourceFlowConfig"],
+ triggerConfig=response["triggerConfig"],
+ description=response.get("description", "Flow description."),
+ tasks=tasks,
+ )
diff --git a/airflow/providers/amazon/aws/operators/appflow.py
b/airflow/providers/amazon/aws/operators/appflow.py
new file mode 100644
index 0000000000..a63ec8ccdb
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/appflow.py
@@ -0,0 +1,485 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, List, Optional, cast
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.operators.python import ShortCircuitOperator
+from airflow.providers.amazon.aws.hooks.appflow import AppflowHook
+from airflow.providers.amazon.aws.utils import datetime_to_epoch_ms,
get_airflow_version
+
+if TYPE_CHECKING:
+ from mypy_boto3_appflow.type_defs import (
+ DescribeFlowExecutionRecordsResponseTypeDef,
+ ExecutionRecordTypeDef,
+ TaskTypeDef,
+ )
+
+ from airflow.utils.context import Context
+
+
+SUPPORTED_SOURCES = {"salesforce", "zendesk"}
+MANDATORY_FILTER_DATE_MSG = "The filter_date argument is mandatory for
{entity}!"
+NOT_SUPPORTED_SOURCE_MSG = "Source {source} is not supported for {entity}!"
+
+
+class AppflowBaseOperator(BaseOperator):
+ """
+ Amazon Appflow Base Operator class (not supposed to be used directly in
DAGs).
+
+ :param source: The source name (Supported: salesforce, zendesk)
+ :param flow_name: The flow name
+ :param flow_update: A boolean to enable/disable a flow update before the
run
+ :param source_field: The field name to apply filters
+ :param filter_date: The date value (or template) to be used in filters.
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ ui_color = "#2bccbd"
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ flow_update: bool,
+ source_field: Optional[str] = None,
+ filter_date: Optional[str] = None,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ if source not in SUPPORTED_SOURCES:
+ raise ValueError(f"{source} is not a supported source (options:
{SUPPORTED_SOURCES})!")
+ self.filter_date = filter_date
+ self.flow_name = flow_name
+ self.source = source
+ self.source_field = source_field
+ self.poll_interval = poll_interval
+ self.aws_conn_id = aws_conn_id
+ self.region = region
+ self.flow_update = flow_update
+
+ @cached_property
+ def hook(self) -> AppflowHook:
+ """Create and return an AppflowHook."""
+ return AppflowHook(aws_conn_id=self.aws_conn_id,
region_name=self.region)
+
+ def execute(self, context: "Context") -> None:
+ self.filter_date_parsed: Optional[datetime] = (
+ datetime.fromisoformat(self.filter_date) if self.filter_date else
None
+ )
+ self.connector_type = self._get_connector_type()
+ if self.flow_update:
+ self._update_flow()
+ self._run_flow(context)
+
+ def _get_connector_type(self) -> str:
+ response = self.hook.conn.describe_flow(flowName=self.flow_name)
+ connector_type = response["sourceFlowConfig"]["connectorType"]
+ if self.source != connector_type.lower():
+ raise ValueError(f"Incompatible source ({self.source} and
connector type ({connector_type})!")
+ return connector_type
+
+ def _update_flow(self) -> None:
+ self.hook.update_flow_filter(flow_name=self.flow_name,
filter_tasks=[], set_trigger_ondemand=True)
+
+ def _run_flow(self, context) -> str:
+ execution_id = self.hook.run_flow(flow_name=self.flow_name,
poll_interval=self.poll_interval)
+ task_instance = context["task_instance"]
+ task_instance.xcom_push("execution_id", execution_id)
+ return execution_id
+
+
+class AppflowRunOperator(AppflowBaseOperator):
+ """
+ Execute a Appflow run with filters as is.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRunOperator`
+
+ :param source: The source name (Supported: salesforce, zendesk)
+ :param flow_name: The flow name
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if source not in {"salesforce", "zendesk"}:
+ raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,
entity="AppflowRunOperator"))
+ super().__init__(
+ source=source,
+ flow_name=flow_name,
+ flow_update=False,
+ source_field=None,
+ filter_date=None,
+ poll_interval=poll_interval,
+ aws_conn_id=aws_conn_id,
+ region=region,
+ **kwargs,
+ )
+
+
+class AppflowRunFullOperator(AppflowBaseOperator):
+ """
+ Execute a Appflow full run removing any filter.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRunFullOperator`
+
+ :param source: The source name (Supported: salesforce, zendesk)
+ :param flow_name: The flow name
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if source not in {"salesforce", "zendesk"}:
+ raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,
entity="AppflowRunFullOperator"))
+ super().__init__(
+ source=source,
+ flow_name=flow_name,
+ flow_update=True,
+ source_field=None,
+ filter_date=None,
+ poll_interval=poll_interval,
+ aws_conn_id=aws_conn_id,
+ region=region,
+ **kwargs,
+ )
+
+
+class AppflowRunBeforeOperator(AppflowBaseOperator):
+ """
+ Execute a Appflow run after updating the filters to select only previous
data.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRunBeforeOperator`
+
+ :param source: The source name (Supported: salesforce)
+ :param flow_name: The flow name
+ :param source_field: The field name to apply filters
+ :param filter_date: The date value (or template) to be used in filters.
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ template_fields = ("filter_date",)
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ source_field: str,
+ filter_date: str,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if not filter_date:
+ raise
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunBeforeOperator"))
+ if source != "salesforce":
+ raise ValueError(
+ NOT_SUPPORTED_SOURCE_MSG.format(source=source,
entity="AppflowRunBeforeOperator")
+ )
+ super().__init__(
+ source=source,
+ flow_name=flow_name,
+ flow_update=True,
+ source_field=source_field,
+ filter_date=filter_date,
+ poll_interval=poll_interval,
+ aws_conn_id=aws_conn_id,
+ region=region,
+ **kwargs,
+ )
+
+ def _update_flow(self) -> None:
+ if not self.filter_date_parsed:
+ raise ValueError(f"Invalid filter_date argument parser value:
{self.filter_date_parsed}")
+ if not self.source_field:
+ raise ValueError(f"Invalid source_field argument value:
{self.source_field}")
+ filter_task: "TaskTypeDef" = {
+ "taskType": "Filter",
+ "connectorOperator": {self.connector_type: "LESS_THAN"}, # type:
ignore
+ "sourceFields": [self.source_field],
+ "taskProperties": {
+ "DATA_TYPE": "datetime",
+ "VALUE": str(datetime_to_epoch_ms(self.filter_date_parsed)),
+ }, # NOT inclusive
+ }
+ self.hook.update_flow_filter(
+ flow_name=self.flow_name, filter_tasks=[filter_task],
set_trigger_ondemand=True
+ )
+
+
+class AppflowRunAfterOperator(AppflowBaseOperator):
+ """
+ Execute a Appflow run after updating the filters to select only future
data.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRunAfterOperator`
+
+ :param source: The source name (Supported: salesforce, zendesk)
+ :param flow_name: The flow name
+ :param source_field: The field name to apply filters
+ :param filter_date: The date value (or template) to be used in filters.
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ template_fields = ("filter_date",)
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ source_field: str,
+ filter_date: str,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if not filter_date:
+ raise
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunAfterOperator"))
+ if source not in {"salesforce", "zendesk"}:
+ raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,
entity="AppflowRunAfterOperator"))
+ super().__init__(
+ source=source,
+ flow_name=flow_name,
+ flow_update=True,
+ source_field=source_field,
+ filter_date=filter_date,
+ poll_interval=poll_interval,
+ aws_conn_id=aws_conn_id,
+ region=region,
+ **kwargs,
+ )
+
+ def _update_flow(self) -> None:
+ if not self.filter_date_parsed:
+ raise ValueError(f"Invalid filter_date argument parser value:
{self.filter_date_parsed}")
+ if not self.source_field:
+ raise ValueError(f"Invalid source_field argument value:
{self.source_field}")
+ filter_task: "TaskTypeDef" = {
+ "taskType": "Filter",
+ "connectorOperator": {self.connector_type: "GREATER_THAN"}, #
type: ignore
+ "sourceFields": [self.source_field],
+ "taskProperties": {
+ "DATA_TYPE": "datetime",
+ "VALUE": str(datetime_to_epoch_ms(self.filter_date_parsed)),
+ }, # NOT inclusive
+ }
+ self.hook.update_flow_filter(
+ flow_name=self.flow_name, filter_tasks=[filter_task],
set_trigger_ondemand=True
+ )
+
+
+class AppflowRunDailyOperator(AppflowBaseOperator):
+ """
+ Execute a Appflow run after updating the filters to select only a single
day.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRunDailyOperator`
+
+ :param source: The source name (Supported: salesforce)
+ :param flow_name: The flow name
+ :param source_field: The field name to apply filters
+ :param filter_date: The date value (or template) to be used in filters.
+ :param poll_interval: how often in seconds to check the query status
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ template_fields = ("filter_date",)
+
+ def __init__(
+ self,
+ source: str,
+ flow_name: str,
+ source_field: str,
+ filter_date: str,
+ poll_interval: int = 20,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if not filter_date:
+ raise
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunDailyOperator"))
+ if source != "salesforce":
+ raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source,
entity="AppflowRunDailyOperator"))
+ super().__init__(
+ source=source,
+ flow_name=flow_name,
+ flow_update=True,
+ source_field=source_field,
+ filter_date=filter_date,
+ poll_interval=poll_interval,
+ aws_conn_id=aws_conn_id,
+ region=region,
+ **kwargs,
+ )
+
+ def _update_flow(self) -> None:
+ if not self.filter_date_parsed:
+ raise ValueError(f"Invalid filter_date argument parser value:
{self.filter_date_parsed}")
+ if not self.source_field:
+ raise ValueError(f"Invalid source_field argument value:
{self.source_field}")
+ start_filter_date = self.filter_date_parsed - timedelta(milliseconds=1)
+ end_filter_date = self.filter_date_parsed + timedelta(days=1)
+ filter_task: "TaskTypeDef" = {
+ "taskType": "Filter",
+ "connectorOperator": {self.connector_type: "BETWEEN"}, # type:
ignore
+ "sourceFields": [self.source_field],
+ "taskProperties": {
+ "DATA_TYPE": "datetime",
+ "LOWER_BOUND": str(datetime_to_epoch_ms(start_filter_date)),
# NOT inclusive
+ "UPPER_BOUND": str(datetime_to_epoch_ms(end_filter_date)), #
NOT inclusive
+ },
+ }
+ self.hook.update_flow_filter(
+ flow_name=self.flow_name, filter_tasks=[filter_task],
set_trigger_ondemand=True
+ )
+
+
+class AppflowRecordsShortCircuitOperator(ShortCircuitOperator):
+ """
+ Short-circuit in case of a empty Appflow's run.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AppflowRecordsShortCircuitOperator`
+
+ :param flow_name: The flow name
+ :param appflow_run_task_id: Run task ID from where this operator should
extract the execution ID
+ :param ignore_downstream_trigger_rules: Ignore downstream trigger rules
(Ignored for Airflow < 2.3)
+ :param aws_conn_id: aws connection to use
+ :param region: aws region to use
+ """
+
+ ui_color = "#33ffec" # Light blue
+
+ def __init__(
+ self,
+ *,
+ flow_name: str,
+ appflow_run_task_id: str,
+ ignore_downstream_trigger_rules: bool = True,
+ aws_conn_id: str = "aws_default",
+ region: Optional[str] = None,
+ **kwargs,
+ ) -> None:
+ if get_airflow_version() >= (2, 3):
+ kwargs["ignore_downstream_trigger_rules"] =
ignore_downstream_trigger_rules
+ else:
+ self.log.warning(
+ "Ignoring argument ignore_downstream_trigger_rules (%s) - Only
supported for Airflow >= 2.3",
+ ignore_downstream_trigger_rules,
+ )
+ super().__init__(
+ python_callable=self._has_new_records_func,
+ op_kwargs={
+ "flow_name": flow_name,
+ "appflow_run_task_id": appflow_run_task_id,
+ },
+ **kwargs,
+ )
+ self.aws_conn_id = aws_conn_id
+ self.region = region
+
+ @staticmethod
+ def _get_target_execution_id(
+ records: List["ExecutionRecordTypeDef"], execution_id: str
+ ) -> Optional["ExecutionRecordTypeDef"]:
+ for record in records:
+ if record.get("executionId") == execution_id:
+ return record
+ return None
+
+ @cached_property
+ def hook(self) -> AppflowHook:
+ """Create and return an AppflowHook."""
+ return AppflowHook(aws_conn_id=self.aws_conn_id,
region_name=self.region)
+
+ def _has_new_records_func(self, **kwargs) -> bool:
+ appflow_task_id = kwargs["appflow_run_task_id"]
+ self.log.info("appflow_task_id: %s", appflow_task_id)
+ flow_name = kwargs["flow_name"]
+ self.log.info("flow_name: %s", flow_name)
+ af_client = self.hook.conn
+ task_instance = kwargs["task_instance"]
+ execution_id = task_instance.xcom_pull(task_ids=appflow_task_id,
key="execution_id") # type: ignore
+ if not execution_id:
+ raise AirflowException(f"No execution_id found from task_id
{appflow_task_id}!")
+ self.log.info("execution_id: %s", execution_id)
+ args = {"flowName": flow_name, "maxResults": 100}
+ response: "DescribeFlowExecutionRecordsResponseTypeDef" = cast(
+ "DescribeFlowExecutionRecordsResponseTypeDef", {}
+ )
+ record = None
+
+ while not record:
+ if "nextToken" in response:
+ response =
af_client.describe_flow_execution_records(nextToken=response["nextToken"],
**args)
+ else:
+ response = af_client.describe_flow_execution_records(**args)
+ record =
AppflowRecordsShortCircuitOperator._get_target_execution_id(
+ response["flowExecutions"], execution_id
+ )
+ if not record and "nextToken" not in response:
+ raise AirflowException(f"Flow ({execution_id}) without
recordsProcessed info.")
+
+ execution = record.get("executionResult", {})
+ if "recordsProcessed" not in execution:
+ raise AirflowException(f"Flow ({execution_id}) without
recordsProcessed info!")
+ records_processed = execution["recordsProcessed"]
+ self.log.info("records_processed: %d", records_processed)
+ task_instance.xcom_push("records_processed", records_processed) #
type: ignore
+ return records_processed > 0
diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py
b/airflow/providers/amazon/aws/operators/redshift_data.py
index f2d47da655..07854544a5 100644
--- a/airflow/providers/amazon/aws/operators/redshift_data.py
+++ b/airflow/providers/amazon/aws/operators/redshift_data.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
import sys
from time import sleep
from typing import TYPE_CHECKING, Any, Dict, Optional
diff --git a/airflow/providers/amazon/aws/secrets/secrets_manager.py
b/airflow/providers/amazon/aws/secrets/secrets_manager.py
index 8b72f955ac..c740077a56 100644
--- a/airflow/providers/amazon/aws/secrets/secrets_manager.py
+++ b/airflow/providers/amazon/aws/secrets/secrets_manager.py
@@ -27,7 +27,7 @@ from urllib.parse import urlencode
import boto3
-from airflow.version import version as airflow_version
+from airflow.providers.amazon.aws.utils import get_airflow_version
if sys.version_info >= (3, 8):
from functools import cached_property
@@ -214,7 +214,7 @@ class SecretsManagerBackend(BaseSecretsBackend,
LoggingMixin):
:param conn_id: the connection id
:return: deserialized Connection
"""
- if _parse_version(airflow_version) >= (2, 3):
+ if get_airflow_version() >= (2, 3):
warnings.warn(
f"Method `{self.__class__.__name__}.get_conn_uri` is
deprecated and will be removed "
"in a future release. Please use method `get_conn_value`
instead.",
diff --git a/airflow/providers/amazon/aws/secrets/systems_manager.py
b/airflow/providers/amazon/aws/secrets/systems_manager.py
index e45a5500ab..140fec11ef 100644
--- a/airflow/providers/amazon/aws/secrets/systems_manager.py
+++ b/airflow/providers/amazon/aws/secrets/systems_manager.py
@@ -23,7 +23,7 @@ from typing import Optional
import boto3
-from airflow.version import version as airflow_version
+from airflow.providers.amazon.aws.utils import get_airflow_version
if sys.version_info >= (3, 8):
from functools import cached_property
@@ -115,7 +115,7 @@ class
SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
:param conn_id: the connection id
:return: deserialized Connection
"""
- if _parse_version(airflow_version) >= (2, 3):
+ if get_airflow_version() >= (2, 3):
warnings.warn(
f"Method `{self.__class__.__name__}.get_conn_uri` is
deprecated and will be removed "
"in a future release. Please use method `get_conn_value`
instead.",
diff --git a/airflow/providers/amazon/aws/utils/__init__.py
b/airflow/providers/amazon/aws/utils/__init__.py
index 13a83393a9..7f127f7178 100644
--- a/airflow/providers/amazon/aws/utils/__init__.py
+++ b/airflow/providers/amazon/aws/utils/__init__.py
@@ -14,3 +14,29 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+import re
+from datetime import datetime
+from typing import Tuple
+
+from airflow.version import version
+
+
+def datetime_to_epoch(date_time: datetime) -> int:
+ """Convert a datetime object to an epoch integer (seconds)."""
+ return int(date_time.timestamp())
+
+
+def datetime_to_epoch_ms(date_time: datetime) -> int:
+ """Convert a datetime object to an epoch integer (milliseconds)."""
+ return int(date_time.timestamp() * 1_000)
+
+
+def datetime_to_epoch_us(date_time: datetime) -> int:
+ """Convert a datetime object to an epoch integer (microseconds)."""
+ return int(date_time.timestamp() * 1_000_000)
+
+
+def get_airflow_version() -> Tuple[int, ...]:
+ val = re.sub(r'(\d+\.\d+\.\d+).*', lambda x: x.group(1), version)
+ return tuple(int(x) for x in val.split('.'))
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 01fb7e9104..4ad53e028f 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -213,6 +213,12 @@ integrations:
external-doc-url:
https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
logo: /integration-logos/aws/[email protected]
tags: [aws]
+ - integration-name: Amazon Appflow
+ external-doc-url:
https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html
+ logo: /integration-logos/aws/Amazon_AppFlow_light.png
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/appflow.rst
+ tags: [aws]
operators:
- integration-name: Amazon Athena
@@ -311,6 +317,9 @@ operators:
- integration-name: Amazon QuickSight
python-modules:
- airflow.providers.amazon.aws.operators.quicksight
+ - integration-name: Amazon Appflow
+ python-modules:
+ - airflow.providers.amazon.aws.operators.appflow
sensors:
- integration-name: Amazon Athena
@@ -475,6 +484,9 @@ hooks:
- integration-name: AWS Security Token Service (STS)
python-modules:
- airflow.providers.amazon.aws.hooks.sts
+ - integration-name: Amazon Appflow
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.appflow
transfers:
- source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/index.rst
b/docs/apache-airflow-providers-amazon/index.rst
index 013dbd4dcc..7611caef8c 100644
--- a/docs/apache-airflow-providers-amazon/index.rst
+++ b/docs/apache-airflow-providers-amazon/index.rst
@@ -92,6 +92,7 @@ PIP package Version required
``jsonpath_ng`` ``>=1.5.3``
``mypy-boto3-rds`` ``>=1.21.0``
``mypy-boto3-redshift-data`` ``>=1.21.0``
+``mypy-boto3-appflow`` ``>=1.21.0``
``pandas`` ``>=0.17.1``
``redshift_connector`` ``>=2.0.888``
``sqlalchemy_redshift`` ``>=0.8.6``
diff --git a/docs/apache-airflow-providers-amazon/operators/appflow.rst
b/docs/apache-airflow-providers-amazon/operators/appflow.rst
new file mode 100644
index 0000000000..daf3d873dc
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/appflow.rst
@@ -0,0 +1,146 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+==============
+Amazon AppFlow
+==============
+
+`Amazon AppFlow <https://aws.amazon.com/appflow/>`__ is a fully managed
integration service
+that enables you to securely transfer data between Software-as-a-Service
(SaaS) applications
+like Salesforce, SAP, Zendesk, Slack, and ServiceNow, and AWS services like
Amazon S3 and
+Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at
enterprise
+scale at the frequency you choose - on a schedule, in response to a business
event, or on
+demand. You can configure data transformation capabilities like filtering and
validation to
+generate rich, ready-to-use data as part of the flow itself, without
additional steps.
+AppFlow automatically encrypts data in motion, and allows users to restrict
data from
+flowing over the public Internet for SaaS applications that are integrated with
+AWS PrivateLink, reducing exposure to security threats.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+Run Flow
+========
+
+To run an AppFlow flow keeping all filters as is, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_run]
+ :end-before: [END howto_operator_appflow_run]
+
+.. note::
+ Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunOperator:
+
+Run Flow Full
+=============
+
+To run an AppFlow flow removing all filters, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunFullOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_run_full]
+ :end-before: [END howto_operator_appflow_run_full]
+
+.. note::
+ Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunFullOperator:
+
+Run Flow Daily
+==============
+
+To run an AppFlow flow filtering daily records, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunDailyOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_run_daily]
+ :end-before: [END howto_operator_appflow_run_daily]
+
+.. note::
+ Supported sources: Salesforce
+
+.. _howto/operator:AppflowRunDailyOperator:
+
+Run Flow Before
+===============
+
+To run an AppFlow flow filtering future records and selecting the past ones,
use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunBeforeOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_run_before]
+ :end-before: [END howto_operator_appflow_run_before]
+
+.. note::
+ Supported sources: Salesforce
+
+.. _howto/operator:AppflowRunBeforeOperator:
+
+Run Flow After
+==============
+
+To run an AppFlow flow filtering past records and selecting the future ones,
use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunAfterOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_run_after]
+ :end-before: [END howto_operator_appflow_run_after]
+
+.. note::
+ Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunAfterOperator:
+
+Skipping Tasks For Empty Runs
+=============================
+
+To skip tasks when some AppFlow run return zero records, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRecordsShortCircuitOperator`.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_appflow_shortcircuit]
+ :end-before: [END howto_operator_appflow_shortcircuit]
+
+.. note::
+ Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRecordsShortCircuitOperator:
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Amazon AppFlow
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html>`__
diff --git a/docs/integration-logos/aws/Amazon_AppFlow_light.png
b/docs/integration-logos/aws/Amazon_AppFlow_light.png
new file mode 100644
index 0000000000..fa1d4c64d6
Binary files /dev/null and
b/docs/integration-logos/aws/Amazon_AppFlow_light.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b43440f9ee..5bee2e2783 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -17,6 +17,7 @@ Aneesh
AnnotateTextResponse
Ansible
AppBuilder
+Appflow
ArangoDB
Arg
Args
diff --git a/setup.py b/setup.py
index ebcf944a69..3edaf59c6a 100644
--- a/setup.py
+++ b/setup.py
@@ -200,6 +200,7 @@ amazon = [
pandas_requirement,
'mypy-boto3-rds>=1.21.0',
'mypy-boto3-redshift-data>=1.21.0',
+ 'mypy-boto3-appflow>=1.21.0',
]
apache_beam = [
'apache-beam>=2.39.0',
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index d0b9e1c060..bddb425f07 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -430,6 +430,7 @@ class
TestAmazonProviderProjectStructure(ExampleCoverageTest):
'airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor',
'airflow.providers.amazon.aws.sensors.rds.RdsBaseSensor',
'airflow.providers.amazon.aws.sensors.sagemaker.SageMakerBaseSensor',
+ 'airflow.providers.amazon.aws.operators.appflow.AppflowBaseOperator',
}
MISSING_EXAMPLES_FOR_CLASSES = {
diff --git a/tests/providers/amazon/aws/hooks/test_appflow.py
b/tests/providers/amazon/aws/hooks/test_appflow.py
new file mode 100644
index 0000000000..f50e6e9a86
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_appflow.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from unittest import mock
+from unittest.mock import ANY
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.appflow import AppflowHook
+from airflow.utils import timezone
+
+FLOW_NAME = "flow0"
+EXECUTION_ID = "ex_id"
+CONNECTION_TYPE = "Salesforce"
+REGION_NAME = "us-east-1"
+AWS_CONN_ID = "aws_default"
+
+
[email protected]
+def hook():
+ with
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.__init__",
return_value=None):
+ with
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.conn") as
mock_conn:
+ mock_conn.describe_flow.return_value = {
+ 'sourceFlowConfig': {'connectorType': CONNECTION_TYPE},
+ 'tasks': [],
+ 'triggerConfig': {'triggerProperties': None},
+ 'flowName': FLOW_NAME,
+ 'destinationFlowConfigList': {},
+ 'lastRunExecutionDetails': {
+ 'mostRecentExecutionStatus': 'Successful',
+ 'mostRecentExecutionTime': datetime(3000, 1, 1,
tzinfo=timezone.utc),
+ },
+ }
+ mock_conn.update_flow.return_value = {}
+ mock_conn.start_flow.return_value = {"executionId": EXECUTION_ID}
+ mock_conn.describe_flow_execution_records.return_value = {
+ "flowExecutions": [{"executionId": EXECUTION_ID,
"executionResult": {"recordsProcessed": 1}}]
+ }
+ yield AppflowHook(aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME)
+
+
+def test_conn_attributes(hook):
+ assert hasattr(hook, 'conn')
+ conn = hook.conn
+ assert conn is hook.conn, "AppflowHook conn property non-cached"
+
+
+def test_run_flow(hook):
+ hook.run_flow(flow_name=FLOW_NAME)
+ hook.conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+ assert hook.conn.describe_flow.call_count == 1
+ hook.conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_update_flow_filter(hook):
+ tasks = [
+ {
+ 'taskType': 'Filter',
+ 'connectorOperator': {'Salesforce': 'GREATER_THAN'},
+ 'sourceFields': ['col0'],
+ 'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE':
'1653523200000'},
+ }
+ ]
+ hook.update_flow_filter(flow_name=FLOW_NAME, filter_tasks=tasks,
set_trigger_ondemand=True)
+ hook.conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+ assert hook.conn.describe_flow.call_count == 1
+ hook.conn.update_flow.assert_called_once_with(
+ flowName=FLOW_NAME,
+ tasks=tasks,
+ description=ANY,
+ destinationFlowConfigList=ANY,
+ sourceFlowConfig=ANY,
+ triggerConfig=ANY,
+ )
diff --git a/tests/providers/amazon/aws/operators/test_appflow.py
b/tests/providers/amazon/aws/operators/test_appflow.py
new file mode 100644
index 0000000000..7bbd515693
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_appflow.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from unittest import mock
+from unittest.mock import ANY
+
+import pytest
+
+from airflow.providers.amazon.aws.operators.appflow import (
+ AppflowRecordsShortCircuitOperator,
+ AppflowRunAfterOperator,
+ AppflowRunBeforeOperator,
+ AppflowRunDailyOperator,
+ AppflowRunFullOperator,
+ AppflowRunOperator,
+)
+from airflow.utils import timezone
+
+CONN_ID = "aws_default"
+DAG_ID = "dag_id"
+TASK_ID = "task_id"
+SHORT_CIRCUIT_TASK_ID = "short_circuit_task_id"
+FLOW_NAME = "flow0"
+EXECUTION_ID = "ex_id"
+CONNECTION_TYPE = "Salesforce"
+SOURCE = "salesforce"
+
+DUMP_COMMON_ARGS = {"aws_conn_id": CONN_ID, "task_id": TASK_ID, "source":
SOURCE, "flow_name": FLOW_NAME}
+
+
[email protected]
+def ctx(create_task_instance):
+ ti = create_task_instance(
+ dag_id=DAG_ID,
+ task_id=TASK_ID,
+ schedule_interval="0 12 * * *",
+ )
+ yield {"task_instance": ti}
+
+
[email protected]
+def appflow_conn():
+ with
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.conn") as
mock_conn:
+ mock_conn.describe_flow.return_value = {
+ 'sourceFlowConfig': {'connectorType': CONNECTION_TYPE},
+ 'tasks': [],
+ 'triggerConfig': {'triggerProperties': None},
+ 'flowName': FLOW_NAME,
+ 'destinationFlowConfigList': {},
+ 'lastRunExecutionDetails': {
+ 'mostRecentExecutionStatus': 'Successful',
+ 'mostRecentExecutionTime': datetime(3000, 1, 1,
tzinfo=timezone.utc),
+ },
+ }
+ mock_conn.update_flow.return_value = {}
+ mock_conn.start_flow.return_value = {"executionId": EXECUTION_ID}
+ mock_conn.describe_flow_execution_records.return_value = {
+ "flowExecutions": [{"executionId": EXECUTION_ID,
"executionResult": {"recordsProcessed": 1}}]
+ }
+ yield mock_conn
+
+
+def run_assertions_base(appflow_conn, tasks):
+ appflow_conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+ assert appflow_conn.describe_flow.call_count == 3
+ appflow_conn.update_flow.assert_called_once_with(
+ flowName=FLOW_NAME,
+ tasks=tasks,
+ description=ANY,
+ destinationFlowConfigList=ANY,
+ sourceFlowConfig=ANY,
+ triggerConfig=ANY,
+ )
+ appflow_conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_run(appflow_conn, ctx):
+ operator = AppflowRunOperator(**DUMP_COMMON_ARGS)
+ operator.execute(ctx) # type: ignore
+ appflow_conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+ assert appflow_conn.describe_flow.call_count == 2
+ appflow_conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_run_full(appflow_conn, ctx):
+ operator = AppflowRunFullOperator(**DUMP_COMMON_ARGS)
+ operator.execute(ctx) # type: ignore
+ run_assertions_base(appflow_conn, [])
+
+
+def test_run_after(appflow_conn, ctx):
+ operator = AppflowRunAfterOperator(source_field="col0",
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+ operator.execute(ctx) # type: ignore
+ run_assertions_base(
+ appflow_conn,
+ [
+ {
+ 'taskType': 'Filter',
+ 'connectorOperator': {'Salesforce': 'GREATER_THAN'},
+ 'sourceFields': ['col0'],
+ 'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE':
'1653523200000'},
+ }
+ ],
+ )
+
+
+def test_run_before(appflow_conn, ctx):
+ operator = AppflowRunBeforeOperator(source_field="col0",
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+ operator.execute(ctx) # type: ignore
+ run_assertions_base(
+ appflow_conn,
+ [
+ {
+ 'taskType': 'Filter',
+ 'connectorOperator': {'Salesforce': 'LESS_THAN'},
+ 'sourceFields': ['col0'],
+ 'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE':
'1653523200000'},
+ }
+ ],
+ )
+
+
+def test_run_daily(appflow_conn, ctx):
+ operator = AppflowRunDailyOperator(source_field="col0",
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+ operator.execute(ctx) # type: ignore
+ run_assertions_base(
+ appflow_conn,
+ [
+ {
+ 'taskType': 'Filter',
+ 'connectorOperator': {'Salesforce': 'BETWEEN'},
+ 'sourceFields': ['col0'],
+ 'taskProperties': {
+ 'DATA_TYPE': 'datetime',
+ 'LOWER_BOUND': '1653523199999',
+ 'UPPER_BOUND': '1653609600000',
+ },
+ }
+ ],
+ )
+
+
+def test_short_circuit(appflow_conn, ctx):
+ with mock.patch("airflow.models.TaskInstance.xcom_pull") as mock_xcom_pull:
+ with mock.patch("airflow.models.TaskInstance.xcom_push") as
mock_xcom_push:
+ mock_xcom_pull.return_value = EXECUTION_ID
+ operator = AppflowRecordsShortCircuitOperator(
+ task_id=SHORT_CIRCUIT_TASK_ID,
+ flow_name=FLOW_NAME,
+ appflow_run_task_id=TASK_ID,
+ )
+ operator.execute(ctx) # type: ignore
+
appflow_conn.describe_flow_execution_records.assert_called_once_with(
+ flowName=FLOW_NAME, maxResults=100
+ )
+ mock_xcom_push.assert_called_with("records_processed", 1)
diff --git a/airflow/providers/amazon/aws/utils/__init__.py
b/tests/providers/amazon/aws/utils/test_utils.py
similarity index 58%
copy from airflow/providers/amazon/aws/utils/__init__.py
copy to tests/providers/amazon/aws/utils/test_utils.py
index 13a83393a9..9a6bc198c1 100644
--- a/airflow/providers/amazon/aws/utils/__init__.py
+++ b/tests/providers/amazon/aws/utils/test_utils.py
@@ -14,3 +14,31 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.utils import (
+ datetime_to_epoch,
+ datetime_to_epoch_ms,
+ datetime_to_epoch_us,
+ get_airflow_version,
+)
+
+DT = datetime(2000, 1, 1)
+EPOCH = 946_684_800
+
+
+def test_datetime_to_epoch():
+ assert datetime_to_epoch(DT) == EPOCH
+
+
+def test_datetime_to_epoch_ms():
+ assert datetime_to_epoch_ms(DT) == EPOCH * 1000
+
+
+def test_datetime_to_epoch_us():
+ assert datetime_to_epoch_us(DT) == EPOCH * 1_000_000
+
+
+def test_get_airflow_version():
+ assert len(get_airflow_version()) == 3