This is an automated email from the ASF dual-hosted git repository.

joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e477f4ba6c Amazon appflow (#24057)
e477f4ba6c is described below

commit e477f4ba6cd15fabbfe5210c99947bcb70ddac4f
Author: Igor Tavares <[email protected]>
AuthorDate: Tue Jun 21 20:14:46 2022 -0300

    Amazon appflow (#24057)
    
    * Add Amazon AppFlow hook.
    
    * Add Amazon AppFlow operators.
    
    * Add Amazon AppFlow examples.
    
    * Add Amazon Appflow docs.
    
    * Apply comments/docs patterns.
    
    * Removing the "private" attribute signal and more.
    
    * Fix task_ids for example_appflow.
    
    * Move datetime_to_epoch() to utils and more.
    
    * Fix the AppflowBaseOperator name.
    
    * Ignore AppflowBaseOperator during structure check.
    
    * test_short_circuit refactor.
    
    * Add get_airflow_version.
    
    * Update airflow/providers/amazon/aws/hooks/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Addressing Josh's requests.
    
    * Add cached_property to AppflowHook
    
    * Update airflow/providers/amazon/aws/hooks/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update airflow/providers/amazon/aws/operators/appflow.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Update Josh's comment.
    
    * Update cached_property import.
    
    * Fix mypy.
    
    Co-authored-by: Josh Fell <[email protected]>
---
 .../amazon/aws/example_dags/example_appflow.py     | 110 +++++
 airflow/providers/amazon/aws/hooks/appflow.py      | 145 ++++++
 airflow/providers/amazon/aws/operators/appflow.py  | 485 +++++++++++++++++++++
 .../amazon/aws/operators/redshift_data.py          |   1 +
 .../amazon/aws/secrets/secrets_manager.py          |   4 +-
 .../amazon/aws/secrets/systems_manager.py          |   4 +-
 airflow/providers/amazon/aws/utils/__init__.py     |  26 ++
 airflow/providers/amazon/provider.yaml             |  12 +
 docs/apache-airflow-providers-amazon/index.rst     |   1 +
 .../operators/appflow.rst                          | 146 +++++++
 .../integration-logos/aws/Amazon_AppFlow_light.png | Bin 0 -> 59266 bytes
 docs/spelling_wordlist.txt                         |   1 +
 setup.py                                           |   1 +
 tests/always/test_project_structure.py             |   1 +
 tests/providers/amazon/aws/hooks/test_appflow.py   |  90 ++++
 .../providers/amazon/aws/operators/test_appflow.py | 171 ++++++++
 .../providers/amazon/aws/utils/test_utils.py       |  28 ++
 17 files changed, 1222 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_appflow.py 
b/airflow/providers/amazon/aws/example_dags/example_appflow.py
new file mode 100644
index 0000000000..06903c73d9
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_appflow.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.operators.bash import BashOperator
+from airflow.providers.amazon.aws.operators.appflow import (
+    AppflowRecordsShortCircuitOperator,
+    AppflowRunAfterOperator,
+    AppflowRunBeforeOperator,
+    AppflowRunDailyOperator,
+    AppflowRunFullOperator,
+    AppflowRunOperator,
+)
+
+SOURCE_NAME = "salesforce"
+FLOW_NAME = "salesforce-campaign"
+
+with DAG(
+    "example_appflow",
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    catchup=False,
+    tags=["example"],
+) as dag:
+
+    # [START howto_operator_appflow_run]
+    campaign_dump = AppflowRunOperator(
+        task_id="campaign_dump",
+        source=SOURCE_NAME,
+        flow_name=FLOW_NAME,
+    )
+    # [END howto_operator_appflow_run]
+
+    # [START howto_operator_appflow_run_full]
+    campaign_dump_full = AppflowRunFullOperator(
+        task_id="campaign_dump_full",
+        source=SOURCE_NAME,
+        flow_name=FLOW_NAME,
+    )
+    # [END howto_operator_appflow_run_full]
+
+    # [START howto_operator_appflow_run_daily]
+    campaign_dump_daily = AppflowRunDailyOperator(
+        task_id="campaign_dump_daily",
+        source=SOURCE_NAME,
+        flow_name=FLOW_NAME,
+        source_field="LastModifiedDate",
+        filter_date="{{ ds }}",
+    )
+    # [END howto_operator_appflow_run_daily]
+
+    # [START howto_operator_appflow_run_before]
+    campaign_dump_before = AppflowRunBeforeOperator(
+        task_id="campaign_dump_before",
+        source=SOURCE_NAME,
+        flow_name=FLOW_NAME,
+        source_field="LastModifiedDate",
+        filter_date="{{ ds }}",
+    )
+    # [END howto_operator_appflow_run_before]
+
+    # [START howto_operator_appflow_run_after]
+    campaign_dump_after = AppflowRunAfterOperator(
+        task_id="campaign_dump_after",
+        source=SOURCE_NAME,
+        flow_name=FLOW_NAME,
+        source_field="LastModifiedDate",
+        filter_date="3000-01-01",  # Future date, so no records to dump
+    )
+    # [END howto_operator_appflow_run_after]
+
+    # [START howto_operator_appflow_shortcircuit]
+    campaign_dump_short_circuit = AppflowRecordsShortCircuitOperator(
+        task_id="campaign_dump_short_circuit",
+        flow_name=FLOW_NAME,
+        appflow_run_task_id="campaign_dump_after",  # Should shortcircuit, no 
records expected
+    )
+    # [END howto_operator_appflow_shortcircuit]
+
+    should_be_skipped = BashOperator(
+        task_id="should_be_skipped",
+        bash_command="echo 1",
+    )
+
+    chain(
+        campaign_dump,
+        campaign_dump_full,
+        campaign_dump_daily,
+        campaign_dump_before,
+        campaign_dump_after,
+        campaign_dump_short_circuit,
+        should_be_skipped,
+    )
diff --git a/airflow/providers/amazon/aws/hooks/appflow.py 
b/airflow/providers/amazon/aws/hooks/appflow.py
new file mode 100644
index 0000000000..e2b628bf3d
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/appflow.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from datetime import datetime, timezone
+from time import sleep
+from typing import TYPE_CHECKING, List
+
+from airflow.compat.functools import cached_property
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+if TYPE_CHECKING:
+    from mypy_boto3_appflow.client import AppflowClient
+    from mypy_boto3_appflow.type_defs import TaskTypeDef
+
+
+class AppflowHook(AwsBaseHook):
+    """
+    Interact with Amazon Appflow, using the boto3 library.
+    Hook attribute ``conn`` has all methods that listed in documentation.
+
+    .. seealso::
+        - 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html
+        - https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html
+
+    Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be 
specified and
+        are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    """
+
+    EVENTUAL_CONSISTENCY_OFFSET: int = 15  # seconds
+    EVENTUAL_CONSISTENCY_POLLING: int = 10  # seconds
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "appflow"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self) -> 'AppflowClient':
+        """Get the underlying boto3 Appflow client (cached)"""
+        return super().conn
+
+    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+        """
+        Execute an AppFlow run.
+
+        :param flow_name: The flow name
+        :param poll_interval: Time (seconds) to wait between two consecutive 
calls to check the run status
+        :return: The run execution ID
+        """
+        ts_before: datetime = datetime.now(timezone.utc)
+        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
+        response_start = self.conn.start_flow(flowName=flow_name)
+        execution_id = response_start["executionId"]
+        self.log.info("executionId: %s", execution_id)
+
+        response_desc = self.conn.describe_flow(flowName=flow_name)
+        last_exec_details = response_desc["lastRunExecutionDetails"]
+
+        # Wait Appflow eventual consistence
+        self.log.info("Waiting for Appflow eventual consistence...")
+        while (
+            response_desc.get("lastRunExecutionDetails", {}).get(
+                "mostRecentExecutionTime", datetime(1970, 1, 1, 
tzinfo=timezone.utc)
+            )
+            < ts_before
+        ):
+            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
+            response_desc = self.conn.describe_flow(flowName=flow_name)
+            last_exec_details = response_desc["lastRunExecutionDetails"]
+
+        # Wait flow stops
+        self.log.info("Waiting for flow run...")
+        while (
+            "mostRecentExecutionStatus" not in last_exec_details
+            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
+        ):
+            sleep(poll_interval)
+            response_desc = self.conn.describe_flow(flowName=flow_name)
+            last_exec_details = response_desc["lastRunExecutionDetails"]
+
+        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
+
+        if last_exec_details["mostRecentExecutionStatus"] == "Error":
+            raise Exception(f"Flow error:\n{json.dumps(response_desc, 
default=str)}")
+
+        return execution_id
+
+    def update_flow_filter(
+        self, flow_name: str, filter_tasks: List["TaskTypeDef"], 
set_trigger_ondemand: bool = False
+    ) -> None:
+        """
+        Update the flow task filter.
+        All filters will be removed if an empty array is passed to 
filter_tasks.
+
+        :param flow_name: The flow name
+        :param filter_tasks: List flow tasks to be added
+        :param set_trigger_ondemand: If True, set the trigger to on-demand; 
otherwise, keep the trigger as is
+        :return: None
+        """
+        response = self.conn.describe_flow(flowName=flow_name)
+        connector_type = response["sourceFlowConfig"]["connectorType"]
+        tasks: List["TaskTypeDef"] = []
+
+        # cleanup old filter tasks
+        for task in response["tasks"]:
+            if (
+                task["taskType"] == "Filter"
+                and task.get("connectorOperator", {}).get(connector_type) != 
"PROJECTION"
+            ):
+                self.log.info("Removing task: %s", task)
+            else:
+                tasks.append(task)  # List of non-filter tasks
+
+        tasks += filter_tasks  # Add the new filter tasks
+
+        if set_trigger_ondemand:
+            # Clean up attribute to force on-demand trigger
+            del response["triggerConfig"]["triggerProperties"]
+
+        self.conn.update_flow(
+            flowName=response["flowName"],
+            destinationFlowConfigList=response["destinationFlowConfigList"],
+            sourceFlowConfig=response["sourceFlowConfig"],
+            triggerConfig=response["triggerConfig"],
+            description=response.get("description", "Flow description."),
+            tasks=tasks,
+        )
diff --git a/airflow/providers/amazon/aws/operators/appflow.py 
b/airflow/providers/amazon/aws/operators/appflow.py
new file mode 100644
index 0000000000..a63ec8ccdb
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/appflow.py
@@ -0,0 +1,485 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, List, Optional, cast
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.operators.python import ShortCircuitOperator
+from airflow.providers.amazon.aws.hooks.appflow import AppflowHook
+from airflow.providers.amazon.aws.utils import datetime_to_epoch_ms, 
get_airflow_version
+
+if TYPE_CHECKING:
+    from mypy_boto3_appflow.type_defs import (
+        DescribeFlowExecutionRecordsResponseTypeDef,
+        ExecutionRecordTypeDef,
+        TaskTypeDef,
+    )
+
+    from airflow.utils.context import Context
+
+
+SUPPORTED_SOURCES = {"salesforce", "zendesk"}
+MANDATORY_FILTER_DATE_MSG = "The filter_date argument is mandatory for 
{entity}!"
+NOT_SUPPORTED_SOURCE_MSG = "Source {source} is not supported for {entity}!"
+
+
+class AppflowBaseOperator(BaseOperator):
+    """
+    Amazon Appflow Base Operator class (not supposed to be used directly in 
DAGs).
+
+    :param source: The source name (Supported: salesforce, zendesk)
+    :param flow_name: The flow name
+    :param flow_update: A boolean to enable/disable a flow update before the 
run
+    :param source_field: The field name to apply filters
+    :param filter_date: The date value (or template) to be used in filters.
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    ui_color = "#2bccbd"
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        flow_update: bool,
+        source_field: Optional[str] = None,
+        filter_date: Optional[str] = None,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        if source not in SUPPORTED_SOURCES:
+            raise ValueError(f"{source} is not a supported source (options: 
{SUPPORTED_SOURCES})!")
+        self.filter_date = filter_date
+        self.flow_name = flow_name
+        self.source = source
+        self.source_field = source_field
+        self.poll_interval = poll_interval
+        self.aws_conn_id = aws_conn_id
+        self.region = region
+        self.flow_update = flow_update
+
+    @cached_property
+    def hook(self) -> AppflowHook:
+        """Create and return an AppflowHook."""
+        return AppflowHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region)
+
+    def execute(self, context: "Context") -> None:
+        self.filter_date_parsed: Optional[datetime] = (
+            datetime.fromisoformat(self.filter_date) if self.filter_date else 
None
+        )
+        self.connector_type = self._get_connector_type()
+        if self.flow_update:
+            self._update_flow()
+        self._run_flow(context)
+
+    def _get_connector_type(self) -> str:
+        response = self.hook.conn.describe_flow(flowName=self.flow_name)
+        connector_type = response["sourceFlowConfig"]["connectorType"]
+        if self.source != connector_type.lower():
+            raise ValueError(f"Incompatible source ({self.source} and 
connector type ({connector_type})!")
+        return connector_type
+
+    def _update_flow(self) -> None:
+        self.hook.update_flow_filter(flow_name=self.flow_name, 
filter_tasks=[], set_trigger_ondemand=True)
+
+    def _run_flow(self, context) -> str:
+        execution_id = self.hook.run_flow(flow_name=self.flow_name, 
poll_interval=self.poll_interval)
+        task_instance = context["task_instance"]
+        task_instance.xcom_push("execution_id", execution_id)
+        return execution_id
+
+
+class AppflowRunOperator(AppflowBaseOperator):
+    """
+    Execute a Appflow run with filters as is.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRunOperator`
+
+    :param source: The source name (Supported: salesforce, zendesk)
+    :param flow_name: The flow name
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if source not in {"salesforce", "zendesk"}:
+            raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source, 
entity="AppflowRunOperator"))
+        super().__init__(
+            source=source,
+            flow_name=flow_name,
+            flow_update=False,
+            source_field=None,
+            filter_date=None,
+            poll_interval=poll_interval,
+            aws_conn_id=aws_conn_id,
+            region=region,
+            **kwargs,
+        )
+
+
+class AppflowRunFullOperator(AppflowBaseOperator):
+    """
+    Execute a Appflow full run removing any filter.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRunFullOperator`
+
+    :param source: The source name (Supported: salesforce, zendesk)
+    :param flow_name: The flow name
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if source not in {"salesforce", "zendesk"}:
+            raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source, 
entity="AppflowRunFullOperator"))
+        super().__init__(
+            source=source,
+            flow_name=flow_name,
+            flow_update=True,
+            source_field=None,
+            filter_date=None,
+            poll_interval=poll_interval,
+            aws_conn_id=aws_conn_id,
+            region=region,
+            **kwargs,
+        )
+
+
+class AppflowRunBeforeOperator(AppflowBaseOperator):
+    """
+    Execute a Appflow run after updating the filters to select only previous 
data.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRunBeforeOperator`
+
+    :param source: The source name (Supported: salesforce)
+    :param flow_name: The flow name
+    :param source_field: The field name to apply filters
+    :param filter_date: The date value (or template) to be used in filters.
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    template_fields = ("filter_date",)
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        source_field: str,
+        filter_date: str,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if not filter_date:
+            raise 
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunBeforeOperator"))
+        if source != "salesforce":
+            raise ValueError(
+                NOT_SUPPORTED_SOURCE_MSG.format(source=source, 
entity="AppflowRunBeforeOperator")
+            )
+        super().__init__(
+            source=source,
+            flow_name=flow_name,
+            flow_update=True,
+            source_field=source_field,
+            filter_date=filter_date,
+            poll_interval=poll_interval,
+            aws_conn_id=aws_conn_id,
+            region=region,
+            **kwargs,
+        )
+
+    def _update_flow(self) -> None:
+        if not self.filter_date_parsed:
+            raise ValueError(f"Invalid filter_date argument parser value: 
{self.filter_date_parsed}")
+        if not self.source_field:
+            raise ValueError(f"Invalid source_field argument value: 
{self.source_field}")
+        filter_task: "TaskTypeDef" = {
+            "taskType": "Filter",
+            "connectorOperator": {self.connector_type: "LESS_THAN"},  # type: 
ignore
+            "sourceFields": [self.source_field],
+            "taskProperties": {
+                "DATA_TYPE": "datetime",
+                "VALUE": str(datetime_to_epoch_ms(self.filter_date_parsed)),
+            },  # NOT inclusive
+        }
+        self.hook.update_flow_filter(
+            flow_name=self.flow_name, filter_tasks=[filter_task], 
set_trigger_ondemand=True
+        )
+
+
+class AppflowRunAfterOperator(AppflowBaseOperator):
+    """
+    Execute a Appflow run after updating the filters to select only future 
data.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRunAfterOperator`
+
+    :param source: The source name (Supported: salesforce, zendesk)
+    :param flow_name: The flow name
+    :param source_field: The field name to apply filters
+    :param filter_date: The date value (or template) to be used in filters.
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    template_fields = ("filter_date",)
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        source_field: str,
+        filter_date: str,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if not filter_date:
+            raise 
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunAfterOperator"))
+        if source not in {"salesforce", "zendesk"}:
+            raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source, 
entity="AppflowRunAfterOperator"))
+        super().__init__(
+            source=source,
+            flow_name=flow_name,
+            flow_update=True,
+            source_field=source_field,
+            filter_date=filter_date,
+            poll_interval=poll_interval,
+            aws_conn_id=aws_conn_id,
+            region=region,
+            **kwargs,
+        )
+
+    def _update_flow(self) -> None:
+        if not self.filter_date_parsed:
+            raise ValueError(f"Invalid filter_date argument parser value: 
{self.filter_date_parsed}")
+        if not self.source_field:
+            raise ValueError(f"Invalid source_field argument value: 
{self.source_field}")
+        filter_task: "TaskTypeDef" = {
+            "taskType": "Filter",
+            "connectorOperator": {self.connector_type: "GREATER_THAN"},  # 
type: ignore
+            "sourceFields": [self.source_field],
+            "taskProperties": {
+                "DATA_TYPE": "datetime",
+                "VALUE": str(datetime_to_epoch_ms(self.filter_date_parsed)),
+            },  # NOT inclusive
+        }
+        self.hook.update_flow_filter(
+            flow_name=self.flow_name, filter_tasks=[filter_task], 
set_trigger_ondemand=True
+        )
+
+
+class AppflowRunDailyOperator(AppflowBaseOperator):
+    """
+    Execute a Appflow run after updating the filters to select only a single 
day.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRunDailyOperator`
+
+    :param source: The source name (Supported: salesforce)
+    :param flow_name: The flow name
+    :param source_field: The field name to apply filters
+    :param filter_date: The date value (or template) to be used in filters.
+    :param poll_interval: how often in seconds to check the query status
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    template_fields = ("filter_date",)
+
+    def __init__(
+        self,
+        source: str,
+        flow_name: str,
+        source_field: str,
+        filter_date: str,
+        poll_interval: int = 20,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if not filter_date:
+            raise 
ValueError(MANDATORY_FILTER_DATE_MSG.format(entity="AppflowRunDailyOperator"))
+        if source != "salesforce":
+            raise ValueError(NOT_SUPPORTED_SOURCE_MSG.format(source=source, 
entity="AppflowRunDailyOperator"))
+        super().__init__(
+            source=source,
+            flow_name=flow_name,
+            flow_update=True,
+            source_field=source_field,
+            filter_date=filter_date,
+            poll_interval=poll_interval,
+            aws_conn_id=aws_conn_id,
+            region=region,
+            **kwargs,
+        )
+
+    def _update_flow(self) -> None:
+        if not self.filter_date_parsed:
+            raise ValueError(f"Invalid filter_date argument parser value: 
{self.filter_date_parsed}")
+        if not self.source_field:
+            raise ValueError(f"Invalid source_field argument value: 
{self.source_field}")
+        start_filter_date = self.filter_date_parsed - timedelta(milliseconds=1)
+        end_filter_date = self.filter_date_parsed + timedelta(days=1)
+        filter_task: "TaskTypeDef" = {
+            "taskType": "Filter",
+            "connectorOperator": {self.connector_type: "BETWEEN"},  # type: 
ignore
+            "sourceFields": [self.source_field],
+            "taskProperties": {
+                "DATA_TYPE": "datetime",
+                "LOWER_BOUND": str(datetime_to_epoch_ms(start_filter_date)),  
# NOT inclusive
+                "UPPER_BOUND": str(datetime_to_epoch_ms(end_filter_date)),  # 
NOT inclusive
+            },
+        }
+        self.hook.update_flow_filter(
+            flow_name=self.flow_name, filter_tasks=[filter_task], 
set_trigger_ondemand=True
+        )
+
+
+class AppflowRecordsShortCircuitOperator(ShortCircuitOperator):
+    """
+    Short-circuit in case of a empty Appflow's run.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AppflowRecordsShortCircuitOperator`
+
+    :param flow_name: The flow name
+    :param appflow_run_task_id: Run task ID from where this operator should 
extract the execution ID
+    :param ignore_downstream_trigger_rules: Ignore downstream trigger rules 
(Ignored for Airflow < 2.3)
+    :param aws_conn_id: aws connection to use
+    :param region: aws region to use
+    """
+
+    ui_color = "#33ffec"  # Light blue
+
+    def __init__(
+        self,
+        *,
+        flow_name: str,
+        appflow_run_task_id: str,
+        ignore_downstream_trigger_rules: bool = True,
+        aws_conn_id: str = "aws_default",
+        region: Optional[str] = None,
+        **kwargs,
+    ) -> None:
+        if get_airflow_version() >= (2, 3):
+            kwargs["ignore_downstream_trigger_rules"] = 
ignore_downstream_trigger_rules
+        else:
+            self.log.warning(
+                "Ignoring argument ignore_downstream_trigger_rules (%s) - Only 
supported for Airflow >= 2.3",
+                ignore_downstream_trigger_rules,
+            )
+        super().__init__(
+            python_callable=self._has_new_records_func,
+            op_kwargs={
+                "flow_name": flow_name,
+                "appflow_run_task_id": appflow_run_task_id,
+            },
+            **kwargs,
+        )
+        self.aws_conn_id = aws_conn_id
+        self.region = region
+
+    @staticmethod
+    def _get_target_execution_id(
+        records: List["ExecutionRecordTypeDef"], execution_id: str
+    ) -> Optional["ExecutionRecordTypeDef"]:
+        for record in records:
+            if record.get("executionId") == execution_id:
+                return record
+        return None
+
+    @cached_property
+    def hook(self) -> AppflowHook:
+        """Create and return an AppflowHook."""
+        return AppflowHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region)
+
+    def _has_new_records_func(self, **kwargs) -> bool:
+        appflow_task_id = kwargs["appflow_run_task_id"]
+        self.log.info("appflow_task_id: %s", appflow_task_id)
+        flow_name = kwargs["flow_name"]
+        self.log.info("flow_name: %s", flow_name)
+        af_client = self.hook.conn
+        task_instance = kwargs["task_instance"]
+        execution_id = task_instance.xcom_pull(task_ids=appflow_task_id, 
key="execution_id")  # type: ignore
+        if not execution_id:
+            raise AirflowException(f"No execution_id found from task_id 
{appflow_task_id}!")
+        self.log.info("execution_id: %s", execution_id)
+        args = {"flowName": flow_name, "maxResults": 100}
+        response: "DescribeFlowExecutionRecordsResponseTypeDef" = cast(
+            "DescribeFlowExecutionRecordsResponseTypeDef", {}
+        )
+        record = None
+
+        while not record:
+            if "nextToken" in response:
+                response = 
af_client.describe_flow_execution_records(nextToken=response["nextToken"], 
**args)
+            else:
+                response = af_client.describe_flow_execution_records(**args)
+            record = 
AppflowRecordsShortCircuitOperator._get_target_execution_id(
+                response["flowExecutions"], execution_id
+            )
+            if not record and "nextToken" not in response:
+                raise AirflowException(f"Flow ({execution_id}) without 
recordsProcessed info.")
+
+        execution = record.get("executionResult", {})
+        if "recordsProcessed" not in execution:
+            raise AirflowException(f"Flow ({execution_id}) without 
recordsProcessed info!")
+        records_processed = execution["recordsProcessed"]
+        self.log.info("records_processed: %d", records_processed)
+        task_instance.xcom_push("records_processed", records_processed)  # 
type: ignore
+        return records_processed > 0
diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py 
b/airflow/providers/amazon/aws/operators/redshift_data.py
index f2d47da655..07854544a5 100644
--- a/airflow/providers/amazon/aws/operators/redshift_data.py
+++ b/airflow/providers/amazon/aws/operators/redshift_data.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 import sys
 from time import sleep
 from typing import TYPE_CHECKING, Any, Dict, Optional
diff --git a/airflow/providers/amazon/aws/secrets/secrets_manager.py 
b/airflow/providers/amazon/aws/secrets/secrets_manager.py
index 8b72f955ac..c740077a56 100644
--- a/airflow/providers/amazon/aws/secrets/secrets_manager.py
+++ b/airflow/providers/amazon/aws/secrets/secrets_manager.py
@@ -27,7 +27,7 @@ from urllib.parse import urlencode
 
 import boto3
 
-from airflow.version import version as airflow_version
+from airflow.providers.amazon.aws.utils import get_airflow_version
 
 if sys.version_info >= (3, 8):
     from functools import cached_property
@@ -214,7 +214,7 @@ class SecretsManagerBackend(BaseSecretsBackend, 
LoggingMixin):
         :param conn_id: the connection id
         :return: deserialized Connection
         """
-        if _parse_version(airflow_version) >= (2, 3):
+        if get_airflow_version() >= (2, 3):
             warnings.warn(
                 f"Method `{self.__class__.__name__}.get_conn_uri` is 
deprecated and will be removed "
                 "in a future release.  Please use method `get_conn_value` 
instead.",
diff --git a/airflow/providers/amazon/aws/secrets/systems_manager.py 
b/airflow/providers/amazon/aws/secrets/systems_manager.py
index e45a5500ab..140fec11ef 100644
--- a/airflow/providers/amazon/aws/secrets/systems_manager.py
+++ b/airflow/providers/amazon/aws/secrets/systems_manager.py
@@ -23,7 +23,7 @@ from typing import Optional
 
 import boto3
 
-from airflow.version import version as airflow_version
+from airflow.providers.amazon.aws.utils import get_airflow_version
 
 if sys.version_info >= (3, 8):
     from functools import cached_property
@@ -115,7 +115,7 @@ class 
SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
         :param conn_id: the connection id
         :return: deserialized Connection
         """
-        if _parse_version(airflow_version) >= (2, 3):
+        if get_airflow_version() >= (2, 3):
             warnings.warn(
                 f"Method `{self.__class__.__name__}.get_conn_uri` is 
deprecated and will be removed "
                 "in a future release.  Please use method `get_conn_value` 
instead.",
diff --git a/airflow/providers/amazon/aws/utils/__init__.py 
b/airflow/providers/amazon/aws/utils/__init__.py
index 13a83393a9..7f127f7178 100644
--- a/airflow/providers/amazon/aws/utils/__init__.py
+++ b/airflow/providers/amazon/aws/utils/__init__.py
@@ -14,3 +14,29 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import re
+from datetime import datetime
+from typing import Tuple
+
+from airflow.version import version
+
+
+def datetime_to_epoch(date_time: datetime) -> int:
+    """Convert a datetime object to an epoch integer (seconds)."""
+    return int(date_time.timestamp())
+
+
+def datetime_to_epoch_ms(date_time: datetime) -> int:
+    """Convert a datetime object to an epoch integer (milliseconds)."""
+    return int(date_time.timestamp() * 1_000)
+
+
+def datetime_to_epoch_us(date_time: datetime) -> int:
+    """Convert a datetime object to an epoch integer (microseconds)."""
+    return int(date_time.timestamp() * 1_000_000)
+
+
+def get_airflow_version() -> Tuple[int, ...]:
+    val = re.sub(r'(\d+\.\d+\.\d+).*', lambda x: x.group(1), version)
+    return tuple(int(x) for x in val.split('.'))
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 01fb7e9104..4ad53e028f 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -213,6 +213,12 @@ integrations:
     external-doc-url: 
https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
     logo: /integration-logos/aws/[email protected]
     tags: [aws]
+  - integration-name: Amazon Appflow
+    external-doc-url: 
https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html
+    logo: /integration-logos/aws/Amazon_AppFlow_light.png
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/appflow.rst
+    tags: [aws]
 
 operators:
   - integration-name: Amazon Athena
@@ -311,6 +317,9 @@ operators:
   - integration-name: Amazon QuickSight
     python-modules:
       - airflow.providers.amazon.aws.operators.quicksight
+  - integration-name: Amazon Appflow
+    python-modules:
+      - airflow.providers.amazon.aws.operators.appflow
 
 sensors:
   - integration-name: Amazon Athena
@@ -475,6 +484,9 @@ hooks:
   - integration-name: AWS Security Token Service (STS)
     python-modules:
       - airflow.providers.amazon.aws.hooks.sts
+  - integration-name: Amazon Appflow
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.appflow
 
 transfers:
   - source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/index.rst 
b/docs/apache-airflow-providers-amazon/index.rst
index 013dbd4dcc..7611caef8c 100644
--- a/docs/apache-airflow-providers-amazon/index.rst
+++ b/docs/apache-airflow-providers-amazon/index.rst
@@ -92,6 +92,7 @@ PIP package                   Version required
 ``jsonpath_ng``               ``>=1.5.3``
 ``mypy-boto3-rds``            ``>=1.21.0``
 ``mypy-boto3-redshift-data``  ``>=1.21.0``
+``mypy-boto3-appflow``        ``>=1.21.0``
 ``pandas``                    ``>=0.17.1``
 ``redshift_connector``        ``>=2.0.888``
 ``sqlalchemy_redshift``       ``>=0.8.6``
diff --git a/docs/apache-airflow-providers-amazon/operators/appflow.rst 
b/docs/apache-airflow-providers-amazon/operators/appflow.rst
new file mode 100644
index 0000000000..daf3d873dc
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/appflow.rst
@@ -0,0 +1,146 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+==============
+Amazon AppFlow
+==============
+
+`Amazon AppFlow <https://aws.amazon.com/appflow/>`__ is a fully managed 
integration service
+that enables you to securely transfer data between Software-as-a-Service 
(SaaS) applications
+like Salesforce, SAP, Zendesk, Slack, and ServiceNow, and AWS services like 
Amazon S3 and
+Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at 
enterprise
+scale at the frequency you choose - on a schedule, in response to a business 
event, or on
+demand. You can configure data transformation capabilities like filtering and 
validation to
+generate rich, ready-to-use data as part of the flow itself, without 
additional steps.
+AppFlow automatically encrypts data in motion, and allows users to restrict 
data from
+flowing over the public Internet for SaaS applications that are integrated with
+AWS PrivateLink, reducing exposure to security threats.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+Run Flow
+========
+
+To run an AppFlow flow keeping all filters as is, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_run]
+    :end-before: [END howto_operator_appflow_run]
+
+.. note::
+  Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunOperator:
+
+Run Flow Full
+=============
+
+To run an AppFlow flow removing all filters, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunFullOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_run_full]
+    :end-before: [END howto_operator_appflow_run_full]
+
+.. note::
+  Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunFullOperator:
+
+Run Flow Daily
+==============
+
+To run an AppFlow flow filtering daily records, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunDailyOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_run_daily]
+    :end-before: [END howto_operator_appflow_run_daily]
+
+.. note::
+  Supported sources: Salesforce
+
+.. _howto/operator:AppflowRunDailyOperator:
+
+Run Flow Before
+===============
+
+To run an AppFlow flow filtering future records and selecting the past ones, 
use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunBeforeOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_run_before]
+    :end-before: [END howto_operator_appflow_run_before]
+
+.. note::
+  Supported sources: Salesforce
+
+.. _howto/operator:AppflowRunBeforeOperator:
+
+Run Flow After
+==============
+
+To run an AppFlow flow filtering past records and selecting the future ones, 
use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRunAfterOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_run_after]
+    :end-before: [END howto_operator_appflow_run_after]
+
+.. note::
+  Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRunAfterOperator:
+
+Skipping Tasks For Empty Runs
+=============================
+
+To skip tasks when some AppFlow run return zero records, use:
+:class:`~airflow.providers.amazon.aws.operators.appflow.AppflowRecordsShortCircuitOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_appflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_appflow_shortcircuit]
+    :end-before: [END howto_operator_appflow_shortcircuit]
+
+.. note::
+  Supported sources: Salesforce, Zendesk
+
+.. _howto/operator:AppflowRecordsShortCircuitOperator:
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Amazon AppFlow 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html>`__
diff --git a/docs/integration-logos/aws/Amazon_AppFlow_light.png 
b/docs/integration-logos/aws/Amazon_AppFlow_light.png
new file mode 100644
index 0000000000..fa1d4c64d6
Binary files /dev/null and 
b/docs/integration-logos/aws/Amazon_AppFlow_light.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b43440f9ee..5bee2e2783 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -17,6 +17,7 @@ Aneesh
 AnnotateTextResponse
 Ansible
 AppBuilder
+Appflow
 ArangoDB
 Arg
 Args
diff --git a/setup.py b/setup.py
index ebcf944a69..3edaf59c6a 100644
--- a/setup.py
+++ b/setup.py
@@ -200,6 +200,7 @@ amazon = [
     pandas_requirement,
     'mypy-boto3-rds>=1.21.0',
     'mypy-boto3-redshift-data>=1.21.0',
+    'mypy-boto3-appflow>=1.21.0',
 ]
 apache_beam = [
     'apache-beam>=2.39.0',
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index d0b9e1c060..bddb425f07 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -430,6 +430,7 @@ class 
TestAmazonProviderProjectStructure(ExampleCoverageTest):
         'airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor',
         'airflow.providers.amazon.aws.sensors.rds.RdsBaseSensor',
         'airflow.providers.amazon.aws.sensors.sagemaker.SageMakerBaseSensor',
+        'airflow.providers.amazon.aws.operators.appflow.AppflowBaseOperator',
     }
 
     MISSING_EXAMPLES_FOR_CLASSES = {
diff --git a/tests/providers/amazon/aws/hooks/test_appflow.py 
b/tests/providers/amazon/aws/hooks/test_appflow.py
new file mode 100644
index 0000000000..f50e6e9a86
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_appflow.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from unittest import mock
+from unittest.mock import ANY
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.appflow import AppflowHook
+from airflow.utils import timezone
+
+FLOW_NAME = "flow0"
+EXECUTION_ID = "ex_id"
+CONNECTION_TYPE = "Salesforce"
+REGION_NAME = "us-east-1"
+AWS_CONN_ID = "aws_default"
+
+
[email protected]
+def hook():
+    with 
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.__init__", 
return_value=None):
+        with 
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.conn") as 
mock_conn:
+            mock_conn.describe_flow.return_value = {
+                'sourceFlowConfig': {'connectorType': CONNECTION_TYPE},
+                'tasks': [],
+                'triggerConfig': {'triggerProperties': None},
+                'flowName': FLOW_NAME,
+                'destinationFlowConfigList': {},
+                'lastRunExecutionDetails': {
+                    'mostRecentExecutionStatus': 'Successful',
+                    'mostRecentExecutionTime': datetime(3000, 1, 1, 
tzinfo=timezone.utc),
+                },
+            }
+            mock_conn.update_flow.return_value = {}
+            mock_conn.start_flow.return_value = {"executionId": EXECUTION_ID}
+            mock_conn.describe_flow_execution_records.return_value = {
+                "flowExecutions": [{"executionId": EXECUTION_ID, 
"executionResult": {"recordsProcessed": 1}}]
+            }
+            yield AppflowHook(aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME)
+
+
+def test_conn_attributes(hook):
+    assert hasattr(hook, 'conn')
+    conn = hook.conn
+    assert conn is hook.conn, "AppflowHook conn property non-cached"
+
+
+def test_run_flow(hook):
+    hook.run_flow(flow_name=FLOW_NAME)
+    hook.conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+    assert hook.conn.describe_flow.call_count == 1
+    hook.conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_update_flow_filter(hook):
+    tasks = [
+        {
+            'taskType': 'Filter',
+            'connectorOperator': {'Salesforce': 'GREATER_THAN'},
+            'sourceFields': ['col0'],
+            'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE': 
'1653523200000'},
+        }
+    ]
+    hook.update_flow_filter(flow_name=FLOW_NAME, filter_tasks=tasks, 
set_trigger_ondemand=True)
+    hook.conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+    assert hook.conn.describe_flow.call_count == 1
+    hook.conn.update_flow.assert_called_once_with(
+        flowName=FLOW_NAME,
+        tasks=tasks,
+        description=ANY,
+        destinationFlowConfigList=ANY,
+        sourceFlowConfig=ANY,
+        triggerConfig=ANY,
+    )
diff --git a/tests/providers/amazon/aws/operators/test_appflow.py 
b/tests/providers/amazon/aws/operators/test_appflow.py
new file mode 100644
index 0000000000..7bbd515693
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_appflow.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from unittest import mock
+from unittest.mock import ANY
+
+import pytest
+
+from airflow.providers.amazon.aws.operators.appflow import (
+    AppflowRecordsShortCircuitOperator,
+    AppflowRunAfterOperator,
+    AppflowRunBeforeOperator,
+    AppflowRunDailyOperator,
+    AppflowRunFullOperator,
+    AppflowRunOperator,
+)
+from airflow.utils import timezone
+
+CONN_ID = "aws_default"
+DAG_ID = "dag_id"
+TASK_ID = "task_id"
+SHORT_CIRCUIT_TASK_ID = "short_circuit_task_id"
+FLOW_NAME = "flow0"
+EXECUTION_ID = "ex_id"
+CONNECTION_TYPE = "Salesforce"
+SOURCE = "salesforce"
+
+DUMP_COMMON_ARGS = {"aws_conn_id": CONN_ID, "task_id": TASK_ID, "source": 
SOURCE, "flow_name": FLOW_NAME}
+
+
[email protected]
+def ctx(create_task_instance):
+    ti = create_task_instance(
+        dag_id=DAG_ID,
+        task_id=TASK_ID,
+        schedule_interval="0 12 * * *",
+    )
+    yield {"task_instance": ti}
+
+
[email protected]
+def appflow_conn():
+    with 
mock.patch("airflow.providers.amazon.aws.hooks.appflow.AppflowHook.conn") as 
mock_conn:
+        mock_conn.describe_flow.return_value = {
+            'sourceFlowConfig': {'connectorType': CONNECTION_TYPE},
+            'tasks': [],
+            'triggerConfig': {'triggerProperties': None},
+            'flowName': FLOW_NAME,
+            'destinationFlowConfigList': {},
+            'lastRunExecutionDetails': {
+                'mostRecentExecutionStatus': 'Successful',
+                'mostRecentExecutionTime': datetime(3000, 1, 1, 
tzinfo=timezone.utc),
+            },
+        }
+        mock_conn.update_flow.return_value = {}
+        mock_conn.start_flow.return_value = {"executionId": EXECUTION_ID}
+        mock_conn.describe_flow_execution_records.return_value = {
+            "flowExecutions": [{"executionId": EXECUTION_ID, 
"executionResult": {"recordsProcessed": 1}}]
+        }
+        yield mock_conn
+
+
+def run_assertions_base(appflow_conn, tasks):
+    appflow_conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+    assert appflow_conn.describe_flow.call_count == 3
+    appflow_conn.update_flow.assert_called_once_with(
+        flowName=FLOW_NAME,
+        tasks=tasks,
+        description=ANY,
+        destinationFlowConfigList=ANY,
+        sourceFlowConfig=ANY,
+        triggerConfig=ANY,
+    )
+    appflow_conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_run(appflow_conn, ctx):
+    operator = AppflowRunOperator(**DUMP_COMMON_ARGS)
+    operator.execute(ctx)  # type: ignore
+    appflow_conn.describe_flow.assert_called_with(flowName=FLOW_NAME)
+    assert appflow_conn.describe_flow.call_count == 2
+    appflow_conn.start_flow.assert_called_once_with(flowName=FLOW_NAME)
+
+
+def test_run_full(appflow_conn, ctx):
+    operator = AppflowRunFullOperator(**DUMP_COMMON_ARGS)
+    operator.execute(ctx)  # type: ignore
+    run_assertions_base(appflow_conn, [])
+
+
+def test_run_after(appflow_conn, ctx):
+    operator = AppflowRunAfterOperator(source_field="col0", 
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+    operator.execute(ctx)  # type: ignore
+    run_assertions_base(
+        appflow_conn,
+        [
+            {
+                'taskType': 'Filter',
+                'connectorOperator': {'Salesforce': 'GREATER_THAN'},
+                'sourceFields': ['col0'],
+                'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE': 
'1653523200000'},
+            }
+        ],
+    )
+
+
+def test_run_before(appflow_conn, ctx):
+    operator = AppflowRunBeforeOperator(source_field="col0", 
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+    operator.execute(ctx)  # type: ignore
+    run_assertions_base(
+        appflow_conn,
+        [
+            {
+                'taskType': 'Filter',
+                'connectorOperator': {'Salesforce': 'LESS_THAN'},
+                'sourceFields': ['col0'],
+                'taskProperties': {'DATA_TYPE': 'datetime', 'VALUE': 
'1653523200000'},
+            }
+        ],
+    )
+
+
+def test_run_daily(appflow_conn, ctx):
+    operator = AppflowRunDailyOperator(source_field="col0", 
filter_date="2022-05-26", **DUMP_COMMON_ARGS)
+    operator.execute(ctx)  # type: ignore
+    run_assertions_base(
+        appflow_conn,
+        [
+            {
+                'taskType': 'Filter',
+                'connectorOperator': {'Salesforce': 'BETWEEN'},
+                'sourceFields': ['col0'],
+                'taskProperties': {
+                    'DATA_TYPE': 'datetime',
+                    'LOWER_BOUND': '1653523199999',
+                    'UPPER_BOUND': '1653609600000',
+                },
+            }
+        ],
+    )
+
+
+def test_short_circuit(appflow_conn, ctx):
+    with mock.patch("airflow.models.TaskInstance.xcom_pull") as mock_xcom_pull:
+        with mock.patch("airflow.models.TaskInstance.xcom_push") as 
mock_xcom_push:
+            mock_xcom_pull.return_value = EXECUTION_ID
+            operator = AppflowRecordsShortCircuitOperator(
+                task_id=SHORT_CIRCUIT_TASK_ID,
+                flow_name=FLOW_NAME,
+                appflow_run_task_id=TASK_ID,
+            )
+            operator.execute(ctx)  # type: ignore
+            
appflow_conn.describe_flow_execution_records.assert_called_once_with(
+                flowName=FLOW_NAME, maxResults=100
+            )
+            mock_xcom_push.assert_called_with("records_processed", 1)
diff --git a/airflow/providers/amazon/aws/utils/__init__.py 
b/tests/providers/amazon/aws/utils/test_utils.py
similarity index 58%
copy from airflow/providers/amazon/aws/utils/__init__.py
copy to tests/providers/amazon/aws/utils/test_utils.py
index 13a83393a9..9a6bc198c1 100644
--- a/airflow/providers/amazon/aws/utils/__init__.py
+++ b/tests/providers/amazon/aws/utils/test_utils.py
@@ -14,3 +14,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.utils import (
+    datetime_to_epoch,
+    datetime_to_epoch_ms,
+    datetime_to_epoch_us,
+    get_airflow_version,
+)
+
+DT = datetime(2000, 1, 1)
+EPOCH = 946_684_800
+
+
+def test_datetime_to_epoch():
+    assert datetime_to_epoch(DT) == EPOCH
+
+
+def test_datetime_to_epoch_ms():
+    assert datetime_to_epoch_ms(DT) == EPOCH * 1000
+
+
+def test_datetime_to_epoch_us():
+    assert datetime_to_epoch_us(DT) == EPOCH * 1_000_000
+
+
+def test_get_airflow_version():
+    assert len(get_airflow_version()) == 3

Reply via email to