This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b3ebafdce Implement `is_authorized_variable` in AWS auth manager 
(#35804)
3b3ebafdce is described below

commit 3b3ebafdce440952d2406955de290092ca0e361d
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Mon Nov 27 16:11:09 2023 -0500

    Implement `is_authorized_variable` in AWS auth manager (#35804)
---
 .../auth_manager/{constants.py => avp/__init__.py} |   7 -
 .../amazon/aws/auth_manager/avp/entities.py        |  57 ++++++
 .../amazon/aws/auth_manager/avp/facade.py          | 126 +++++++++++++
 .../amazon/aws/auth_manager/aws_auth_manager.py    |  14 +-
 .../providers/amazon/aws/auth_manager/constants.py |   4 +-
 airflow/providers/amazon/aws/auth_manager/user.py  |   3 +
 .../amazon/aws/hooks/verified_permissions.py       |  44 +++++
 airflow/providers/amazon/provider.yaml             |  23 +++
 airflow/www/auth.py                                |  23 ++-
 .../aws/Amazon-Verified-Permissions.png            | Bin 0 -> 13986 bytes
 .../amazon/aws/auth_manager/avp/__init__.py        |   7 -
 .../amazon/aws/auth_manager/avp/test_entities.py   |  14 +-
 .../amazon/aws/auth_manager/avp/test_facade.py     | 203 +++++++++++++++++++++
 .../aws/auth_manager/test_aws_auth_manager.py      |  38 +++-
 .../amazon/aws/auth_manager/test_constants.py      |  12 +-
 .../providers/amazon/aws/auth_manager/test_user.py |   3 +
 .../amazon/aws/hooks/test_verified_permissions.py  |  12 +-
 17 files changed, 549 insertions(+), 41 deletions(-)

diff --git a/airflow/providers/amazon/aws/auth_manager/constants.py 
b/airflow/providers/amazon/aws/auth_manager/avp/__init__.py
similarity index 81%
copy from airflow/providers/amazon/aws/auth_manager/constants.py
copy to airflow/providers/amazon/aws/auth_manager/avp/__init__.py
index f2f9c1da07..13a83393a9 100644
--- a/airflow/providers/amazon/aws/auth_manager/constants.py
+++ b/airflow/providers/amazon/aws/auth_manager/avp/__init__.py
@@ -14,10 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-# Configuration keys
-from __future__ import annotations
-
-CONF_SECTION_NAME = "aws_auth_manager"
-CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
-CONF_ENABLE_KEY = "enable"
diff --git a/airflow/providers/amazon/aws/auth_manager/avp/entities.py 
b/airflow/providers/amazon/aws/auth_manager/avp/entities.py
new file mode 100644
index 0000000000..fad5ee1c3f
--- /dev/null
+++ b/airflow/providers/amazon/aws/auth_manager/avp/entities.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from enum import Enum
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+
+AVP_PREFIX_ENTITIES = "Airflow::"
+
+
+class AvpEntities(Enum):
+    """Enum of Amazon Verified Permissions entities."""
+
+    ACTION = "Action"
+    ROLE = "Role"
+    VARIABLE = "Variable"
+    USER = "User"
+
+
+def get_entity_type(resource_type: AvpEntities) -> str:
+    """
+    Return entity type.
+
+    :param resource_type: Resource type.
+
+    Example: Airflow::Action, Airflow::Role, Airflow::Variable, Airflow::User.
+    """
+    return AVP_PREFIX_ENTITIES + resource_type.value
+
+
+def get_action_id(resource_type: AvpEntities, method: ResourceMethod):
+    """
+    Return action id.
+
+    Convention for action ID is <resource_type>::<method>. Example: 
Variable::GET.
+
+    :param resource_type: Resource type.
+    :param method: Resource method.
+    """
+    return f"{resource_type.value}::{method}"
diff --git a/airflow/providers/amazon/aws/auth_manager/avp/facade.py 
b/airflow/providers/amazon/aws/auth_manager/avp/facade.py
new file mode 100644
index 0000000000..63ed9f5c70
--- /dev/null
+++ b/airflow/providers/amazon/aws/auth_manager/avp/facade.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Callable
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.auth_manager.avp.entities import 
AvpEntities, get_action_id, get_entity_type
+from airflow.providers.amazon.aws.auth_manager.constants import (
+    CONF_AVP_POLICY_STORE_ID_KEY,
+    CONF_CONN_ID_KEY,
+    CONF_SECTION_NAME,
+)
+from airflow.providers.amazon.aws.hooks.verified_permissions import 
VerifiedPermissionsHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+    from airflow.providers.amazon.aws.auth_manager.user import 
AwsAuthManagerUser
+
+
+class AwsAuthManagerAmazonVerifiedPermissionsFacade(LoggingMixin):
+    """
+    Facade for Amazon Verified Permissions.
+
+    Used as an intermediate layer between AWS auth manager and Amazon Verified 
Permissions.
+    """
+
+    @cached_property
+    def avp_client(self):
+        """Build Amazon Verified Permissions client."""
+        aws_conn_id = conf.get(CONF_SECTION_NAME, CONF_CONN_ID_KEY)
+        return VerifiedPermissionsHook(aws_conn_id=aws_conn_id).conn
+
+    @cached_property
+    def avp_policy_store_id(self):
+        """Get the Amazon Verified Permission policy store ID from config."""
+        return conf.get_mandatory_value(CONF_SECTION_NAME, 
CONF_AVP_POLICY_STORE_ID_KEY)
+
+    def is_authorized(
+        self,
+        *,
+        method: ResourceMethod,
+        entity_type: AvpEntities,
+        user: AwsAuthManagerUser,
+        entity_id: str | None = None,
+        entity_fetcher: Callable | None = None,
+    ) -> bool:
+        """
+        Make an authorization decision against Amazon Verified Permissions.
+
+        Check whether the user has permissions to access given resource.
+
+        :param method: the method to perform
+        :param entity_type: the entity type the user accesses
+        :param user: the user
+        :param entity_id: the entity ID the user accesses. If not provided, 
all entities of the type will be
+            considered.
+        :param entity_fetcher: function that returns list of entities to be 
passed to Amazon Verified
+            Permissions. Only needed if some resource properties are used in 
the policies (e.g. DAG folder).
+        """
+        entity_list = self._get_user_role_entities(user)
+        if entity_fetcher and entity_id:
+            # If no entity ID is provided, there is no need to fetch entities.
+            # We just need to know whether the user has permissions to access 
all resources from this type
+            entity_list += entity_fetcher()
+
+        self.log.debug(
+            "Making authorization request for user=%s, method=%s, 
entity_type=%s, entity_id=%s",
+            user.get_id(),
+            method,
+            entity_type,
+            entity_id,
+        )
+
+        resp = self.avp_client.is_authorized(
+            policyStoreId=self.avp_policy_store_id,
+            principal={"entityType": get_entity_type(AvpEntities.USER), 
"entityId": user.get_id()},
+            action={
+                "actionType": get_entity_type(AvpEntities.ACTION),
+                "actionId": get_action_id(entity_type, method),
+            },
+            resource={"entityType": get_entity_type(entity_type), "entityId": 
entity_id or "*"},
+            entities={"entityList": entity_list},
+        )
+
+        self.log.debug("Authorization response: %s", resp)
+
+        if len(resp.get("errors", [])) > 0:
+            self.log.error(
+                "Error occurred while making an authorization decision. 
Errors: %s", resp["errors"]
+            )
+            raise AirflowException("Error occurred while making an 
authorization decision.")
+
+        return resp["decision"] == "ALLOW"
+
+    @staticmethod
+    def _get_user_role_entities(user: AwsAuthManagerUser) -> list[dict]:
+        user_entity = {
+            "identifier": {"entityType": get_entity_type(AvpEntities.USER), 
"entityId": user.get_id()},
+            "parents": [
+                {"entityType": get_entity_type(AvpEntities.ROLE), "entityId": 
group}
+                for group in user.get_groups()
+            ],
+        }
+        role_entities = [
+            {"identifier": {"entityType": get_entity_type(AvpEntities.ROLE), 
"entityId": group}}
+            for group in user.get_groups()
+        ]
+        return [user_entity, *role_entities]
diff --git a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py 
b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
index fcf3caacb9..d552662532 100644
--- a/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
+++ b/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
@@ -23,6 +23,8 @@ from flask import session, url_for
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowOptionalProviderFeatureException
+from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
+from airflow.providers.amazon.aws.auth_manager.avp.facade import 
AwsAuthManagerAmazonVerifiedPermissionsFacade
 from airflow.providers.amazon.aws.auth_manager.constants import (
     CONF_ENABLE_KEY,
     CONF_SECTION_NAME,
@@ -72,6 +74,10 @@ class AwsAuthManager(BaseAuthManager):
                 "The AWS auth manager is currently being built. It is not 
finalized. It is not intended to be used yet."
             )
 
+    @cached_property
+    def avp_facade(self):
+        return AwsAuthManagerAmazonVerifiedPermissionsFacade()
+
     def get_user(self) -> AwsAuthManagerUser | None:
         return session["aws_user"] if self.is_logged_in() else None
 
@@ -122,7 +128,13 @@ class AwsAuthManager(BaseAuthManager):
     def is_authorized_variable(
         self, *, method: ResourceMethod, details: VariableDetails | None = 
None, user: BaseUser | None = None
     ) -> bool:
-        return self.is_logged_in()
+        variable_key = details.key if details else None
+        return self.avp_facade.is_authorized(
+            method=method,
+            entity_type=AvpEntities.VARIABLE,
+            user=user or self.get_user(),
+            entity_id=variable_key,
+        )
 
     def is_authorized_view(
         self,
diff --git a/airflow/providers/amazon/aws/auth_manager/constants.py 
b/airflow/providers/amazon/aws/auth_manager/constants.py
index f2f9c1da07..1ad2633f35 100644
--- a/airflow/providers/amazon/aws/auth_manager/constants.py
+++ b/airflow/providers/amazon/aws/auth_manager/constants.py
@@ -18,6 +18,8 @@
 # Configuration keys
 from __future__ import annotations
 
+CONF_ENABLE_KEY = "enable"
 CONF_SECTION_NAME = "aws_auth_manager"
+CONF_CONN_ID_KEY = "conn_id"
 CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
-CONF_ENABLE_KEY = "enable"
+CONF_AVP_POLICY_STORE_ID_KEY = "avp_policy_store_id"
diff --git a/airflow/providers/amazon/aws/auth_manager/user.py 
b/airflow/providers/amazon/aws/auth_manager/user.py
index 68699783d6..da3dab8ce5 100644
--- a/airflow/providers/amazon/aws/auth_manager/user.py
+++ b/airflow/providers/amazon/aws/auth_manager/user.py
@@ -49,3 +49,6 @@ class AwsAuthManagerUser(BaseUser):
 
     def get_name(self) -> str:
         return self.username or self.email or self.user_id
+
+    def get_groups(self):
+        return self.groups
diff --git a/airflow/providers/amazon/aws/hooks/verified_permissions.py 
b/airflow/providers/amazon/aws/hooks/verified_permissions.py
new file mode 100644
index 0000000000..8c4bb7e90c
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/verified_permissions.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
+
+if TYPE_CHECKING:
+    from mypy_boto3_verifiedpermissions.client import 
VerifiedPermissionsClient  # noqa
+
+
+class VerifiedPermissionsHook(AwsGenericHook["VerifiedPermissionsClient"]):
+    """
+    Interact with Amazon Verified Permissions.
+
+    Provide thin wrapper around 
:external+boto3:py:class:`boto3.client("verifiedpermissions")
+    <VerifiedPermissions.Client>`.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+        - `Amazon Appflow API Reference 
<https://docs.aws.amazon.com/verifiedpermissions/latest/apireference/Welcome.html>`__
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "verifiedpermissions"
+        super().__init__(*args, **kwargs)
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index d68850e437..d6869683d0 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -302,6 +302,10 @@ integrations:
       - /docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
     logo: /integration-logos/aws/AWS-Glue-DataBrew_64.png
     tags: [aws]
+  - integration-name: Amazon Verified Permissions
+    external-doc-url: https://aws.amazon.com/verified-permissions/
+    logo: /integration-logos/aws/Amazon-Verified-Permissions.png
+    tags: [aws]
 
 operators:
   - integration-name: Amazon Athena
@@ -563,6 +567,9 @@ hooks:
   - integration-name: AWS Glue DataBrew
     python-modules:
       - airflow.providers.amazon.aws.hooks.glue_databrew
+  - integration-name: Amazon Verified Permissions
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.verified_permissions
 
 triggers:
   - integration-name: Amazon Web Services
@@ -915,6 +922,14 @@ config:
         type: boolean
         example: "True"
         default: "False"
+      conn_id:
+        description: |
+          The Airflow connection (i.e. credentials) used by the AWS auth 
manager to make API calls to AWS
+          Identity Center and Amazon Verified Permissions.
+        version_added: 8.12.0
+        type: string
+        example: "aws_default"
+        default: "aws_default"
       saml_metadata_url:
         description: |
           SAML metadata XML file provided by AWS Identity Center.
@@ -923,6 +938,14 @@ config:
         type: string
         example: 
"https://portal.sso.<region>.amazonaws.com/saml/metadata/XXXXXXXXXX"
         default: ~
+      avp_policy_store_id:
+        description: |
+          Amazon Verified Permissions' policy store ID where all the policies 
defining user permissions
+          in Airflow are stored. Required.
+        version_added: 8.12.0
+        type: string
+        example: ~
+        default: ~
 
 executors:
   - airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor
diff --git a/airflow/www/auth.py b/airflow/www/auth.py
index 1ad6e6dab5..bcb98d9cbe 100644
--- a/airflow/www/auth.py
+++ b/airflow/www/auth.py
@@ -22,7 +22,7 @@ import warnings
 from functools import wraps
 from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast
 
-from flask import flash, redirect, render_template, request, url_for
+from flask import flash, redirect, render_template, request
 from flask_appbuilder._compat import as_unicode
 from flask_appbuilder.const import (
     FLAMSG_ERR_SEC_ACCESS_DENIED,
@@ -107,21 +107,20 @@ def has_access_with_pk(f):
             _permission_name = self.method_permission_name.get(f.__name__)
             if _permission_name:
                 permission_str = f"{PERMISSION_PREFIX}{_permission_name}"
-        if permission_str in self.base_permissions and 
self.appbuilder.sm.has_access(
-            action_name=permission_str,
-            resource_name=self.class_permission_name,
-            resource_pk=kwargs.get("pk"),
+        if (
+            get_auth_manager().is_logged_in()
+            and permission_str in self.base_permissions
+            and self.appbuilder.sm.has_access(
+                action_name=permission_str,
+                resource_name=self.class_permission_name,
+                resource_pk=kwargs.get("pk"),
+            )
         ):
             return f(self, *args, **kwargs)
         else:
-            log.warning(LOGMSG_ERR_SEC_ACCESS_DENIED.format(permission_str, 
self.__class__.__name__))
+            log.warning(LOGMSG_ERR_SEC_ACCESS_DENIED, permission_str, 
self.__class__.__name__)
             flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
-        return redirect(
-            url_for(
-                self.appbuilder.sm.auth_view.__class__.__name__ + ".login",
-                next=request.url,
-            )
-        )
+        return redirect(get_auth_manager().get_url_login(next=request.url))
 
     f._permission_name = permission_str
     return functools.update_wrapper(wraps, f)
diff --git a/docs/integration-logos/aws/Amazon-Verified-Permissions.png 
b/docs/integration-logos/aws/Amazon-Verified-Permissions.png
new file mode 100644
index 0000000000..a68a7a3b6a
Binary files /dev/null and 
b/docs/integration-logos/aws/Amazon-Verified-Permissions.png differ
diff --git a/airflow/providers/amazon/aws/auth_manager/constants.py 
b/tests/providers/amazon/aws/auth_manager/avp/__init__.py
similarity index 81%
copy from airflow/providers/amazon/aws/auth_manager/constants.py
copy to tests/providers/amazon/aws/auth_manager/avp/__init__.py
index f2f9c1da07..13a83393a9 100644
--- a/airflow/providers/amazon/aws/auth_manager/constants.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/__init__.py
@@ -14,10 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-# Configuration keys
-from __future__ import annotations
-
-CONF_SECTION_NAME = "aws_auth_manager"
-CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
-CONF_ENABLE_KEY = "enable"
diff --git a/airflow/providers/amazon/aws/auth_manager/constants.py 
b/tests/providers/amazon/aws/auth_manager/avp/test_entities.py
similarity index 72%
copy from airflow/providers/amazon/aws/auth_manager/constants.py
copy to tests/providers/amazon/aws/auth_manager/avp/test_entities.py
index f2f9c1da07..c5b512f22c 100644
--- a/airflow/providers/amazon/aws/auth_manager/constants.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_entities.py
@@ -14,10 +14,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-# Configuration keys
 from __future__ import annotations
 
-CONF_SECTION_NAME = "aws_auth_manager"
-CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
-CONF_ENABLE_KEY = "enable"
+from airflow.providers.amazon.aws.auth_manager.avp.entities import 
AvpEntities, get_action_id, get_entity_type
+
+
+def test_get_entity_type():
+    assert get_entity_type(AvpEntities.VARIABLE) == "Airflow::Variable"
+
+
+def test_get_action_id():
+    assert get_action_id(AvpEntities.VARIABLE, "GET") == "Variable::GET"
diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py 
b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
new file mode 100644
index 0000000000..aa093287c9
--- /dev/null
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+from unittest.mock import Mock
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.auth_manager.avp.entities import 
AvpEntities, get_action_id, get_entity_type
+from airflow.providers.amazon.aws.auth_manager.avp.facade import 
AwsAuthManagerAmazonVerifiedPermissionsFacade
+from airflow.providers.amazon.aws.auth_manager.user import AwsAuthManagerUser
+from tests.test_utils.config import conf_vars
+
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+
+AVP_POLICY_STORE_ID = "store_id"
+
+test_user = AwsAuthManagerUser(user_id="test_user", groups=["group1", 
"group2"])
+test_user_no_group = AwsAuthManagerUser(user_id="test_user_no_group", 
groups=[])
+
+
+def simple_entity_fetcher():
+    return [
+        {"identifier": {"entityType": "Airflow::Variable", "entityId": 
"var1"}},
+        {"identifier": {"entityType": "Airflow::Variable", "entityId": 
"var2"}},
+    ]
+
+
+@pytest.fixture
+def facade():
+    return AwsAuthManagerAmazonVerifiedPermissionsFacade()
+
+
+class TestAwsAuthManagerAmazonVerifiedPermissionsFacade:
+    def test_avp_client(self, facade):
+        assert hasattr(facade, "avp_client")
+
+    def test_avp_policy_store_id(self, facade):
+        with conf_vars(
+            {
+                ("aws_auth_manager", "avp_policy_store_id"): 
AVP_POLICY_STORE_ID,
+            }
+        ):
+            assert hasattr(facade, "avp_policy_store_id")
+
+    @pytest.mark.parametrize(
+        "entity_id, user, entity_fetcher, expected_entities, avp_response, 
expected",
+        [
+            # User with groups with no permissions
+            (
+                None,
+                test_user,
+                None,
+                [
+                    {
+                        "identifier": {"entityType": "Airflow::User", 
"entityId": "test_user"},
+                        "parents": [
+                            {"entityType": "Airflow::Role", "entityId": 
"group1"},
+                            {"entityType": "Airflow::Role", "entityId": 
"group2"},
+                        ],
+                    },
+                    {
+                        "identifier": {"entityType": "Airflow::Role", 
"entityId": "group1"},
+                    },
+                    {
+                        "identifier": {"entityType": "Airflow::Role", 
"entityId": "group2"},
+                    },
+                ],
+                {"decision": "DENY"},
+                False,
+            ),
+            # User with groups with permissions
+            (
+                "dummy_id",
+                test_user,
+                None,
+                [
+                    {
+                        "identifier": {"entityType": "Airflow::User", 
"entityId": "test_user"},
+                        "parents": [
+                            {"entityType": "Airflow::Role", "entityId": 
"group1"},
+                            {"entityType": "Airflow::Role", "entityId": 
"group2"},
+                        ],
+                    },
+                    {
+                        "identifier": {"entityType": "Airflow::Role", 
"entityId": "group1"},
+                    },
+                    {
+                        "identifier": {"entityType": "Airflow::Role", 
"entityId": "group2"},
+                    },
+                ],
+                {"decision": "ALLOW"},
+                True,
+            ),
+            # User without group without permission
+            (
+                None,
+                test_user_no_group,
+                None,
+                [
+                    {
+                        "identifier": {"entityType": "Airflow::User", 
"entityId": "test_user_no_group"},
+                        "parents": [],
+                    },
+                ],
+                {"decision": "DENY"},
+                False,
+            ),
+            # With entity fetcher but no resource ID
+            (
+                None,
+                test_user_no_group,
+                simple_entity_fetcher,
+                [
+                    {
+                        "identifier": {"entityType": "Airflow::User", 
"entityId": "test_user_no_group"},
+                        "parents": [],
+                    },
+                ],
+                {"decision": "DENY"},
+                False,
+            ),
+            # With entity fetcher and resource ID
+            (
+                "resource_id",
+                test_user_no_group,
+                simple_entity_fetcher,
+                [
+                    {
+                        "identifier": {"entityType": "Airflow::User", 
"entityId": "test_user_no_group"},
+                        "parents": [],
+                    },
+                    {"identifier": {"entityType": "Airflow::Variable", 
"entityId": "var1"}},
+                    {"identifier": {"entityType": "Airflow::Variable", 
"entityId": "var2"}},
+                ],
+                {"decision": "DENY"},
+                False,
+            ),
+        ],
+    )
+    def test_is_authorized_successful(
+        self, facade, entity_id, user, entity_fetcher, expected_entities, 
avp_response, expected
+    ):
+        mock_is_authorized = Mock(return_value=avp_response)
+        facade.avp_client.is_authorized = mock_is_authorized
+
+        method: ResourceMethod = "GET"
+        entity_type = AvpEntities.VARIABLE
+
+        with conf_vars(
+            {
+                ("aws_auth_manager", "avp_policy_store_id"): 
AVP_POLICY_STORE_ID,
+            }
+        ):
+            result = facade.is_authorized(
+                method=method,
+                entity_type=entity_type,
+                entity_id=entity_id,
+                user=user,
+                entity_fetcher=entity_fetcher,
+            )
+
+        mock_is_authorized.assert_called_once_with(
+            policyStoreId=AVP_POLICY_STORE_ID,
+            principal={"entityType": "Airflow::User", "entityId": 
user.get_id()},
+            action={"actionType": "Airflow::Action", "actionId": 
get_action_id(entity_type, method)},
+            resource={"entityType": get_entity_type(entity_type), "entityId": 
entity_id or "*"},
+            entities={"entityList": expected_entities},
+        )
+
+        assert result == expected
+
+    def test_is_authorized_unsuccessful(self, facade):
+        avp_response = {"errors": ["Error"]}
+        mock_is_authorized = Mock(return_value=avp_response)
+        facade.avp_client.is_authorized = mock_is_authorized
+
+        with conf_vars(
+            {
+                ("aws_auth_manager", "avp_policy_store_id"): 
AVP_POLICY_STORE_ID,
+            }
+        ):
+            with pytest.raises(
+                AirflowException, match="Error occurred while making an 
authorization decision."
+            ):
+                facade.is_authorized(method="GET", 
entity_type=AvpEntities.VARIABLE, user=test_user)
diff --git a/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py 
b/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py
index 56f23eb36a..9cc4fc602b 100644
--- a/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py
+++ b/tests/providers/amazon/aws/auth_manager/test_aws_auth_manager.py
@@ -16,11 +16,14 @@
 # under the License.
 from __future__ import annotations
 
-from unittest.mock import patch
+from typing import TYPE_CHECKING
+from unittest.mock import ANY, Mock, patch
 
 import pytest
 from flask import Flask, session
 
+from airflow.auth.managers.models.resource_details import VariableDetails
+from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
 from airflow.providers.amazon.aws.auth_manager.aws_auth_manager import 
AwsAuthManager
 from 
airflow.providers.amazon.aws.auth_manager.security_manager.aws_security_manager_override
 import (
     AwsSecurityManagerOverride,
@@ -29,6 +32,11 @@ from airflow.providers.amazon.aws.auth_manager.user import 
AwsAuthManagerUser
 from airflow.www.extensions.init_appbuilder import init_appbuilder
 from tests.test_utils.config import conf_vars
 
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+
+mock = Mock()
+
 
 @pytest.fixture
 def auth_manager():
@@ -66,6 +74,9 @@ def test_user():
 
 
 class TestAwsAuthManager:
+    def test_avp_facade(self, auth_manager):
+        assert hasattr(auth_manager, "avp_facade")
+
     @pytest.mark.db_test
     @patch.object(AwsAuthManager, "is_logged_in")
     def test_get_user(self, mock_is_logged_in, auth_manager, app, test_user):
@@ -99,6 +110,31 @@ class TestAwsAuthManager:
 
         assert result is False
 
+    @pytest.mark.parametrize(
+        "details, user, expected_user, expected_entity_id",
+        [
+            (None, None, ANY, None),
+            (VariableDetails(key="var1"), mock, mock, "var1"),
+        ],
+    )
+    @patch.object(AwsAuthManager, "avp_facade")
+    @patch.object(AwsAuthManager, "get_user")
+    def test_is_authorized_variable(
+        self, mock_get_user, mock_avp_facade, details, user, expected_user, 
expected_entity_id, auth_manager
+    ):
+        is_authorized = Mock()
+        mock_avp_facade.is_authorized = is_authorized
+
+        method: ResourceMethod = "GET"
+
+        auth_manager.is_authorized_variable(method=method, details=details, 
user=user)
+
+        if not user:
+            mock_get_user.assert_called_once()
+        is_authorized.assert_called_once_with(
+            method=method, entity_type=AvpEntities.VARIABLE, 
user=expected_user, entity_id=expected_entity_id
+        )
+
     
@patch("airflow.providers.amazon.aws.auth_manager.aws_auth_manager.url_for")
     def test_get_url_login(self, mock_url_for, auth_manager):
         auth_manager.get_url_login()
diff --git a/tests/providers/amazon/aws/auth_manager/test_constants.py 
b/tests/providers/amazon/aws/auth_manager/test_constants.py
index 3b273d9672..c40df2ec0e 100644
--- a/tests/providers/amazon/aws/auth_manager/test_constants.py
+++ b/tests/providers/amazon/aws/auth_manager/test_constants.py
@@ -17,6 +17,8 @@
 from __future__ import annotations
 
 from airflow.providers.amazon.aws.auth_manager.constants import (
+    CONF_AVP_POLICY_STORE_ID_KEY,
+    CONF_CONN_ID_KEY,
     CONF_ENABLE_KEY,
     CONF_SAML_METADATA_URL_KEY,
     CONF_SECTION_NAME,
@@ -24,11 +26,17 @@ from airflow.providers.amazon.aws.auth_manager.constants 
import (
 
 
 class TestAwsAuthManagerConstants:
+    def test_conf_enable_key(self):
+        assert CONF_ENABLE_KEY == "enable"
+
     def test_conf_section_name(self):
         assert CONF_SECTION_NAME == "aws_auth_manager"
 
+    def test_conf_conn_id_key(self):
+        assert CONF_CONN_ID_KEY == "conn_id"
+
     def test_conf_saml_metadata_url_key(self):
         assert CONF_SAML_METADATA_URL_KEY == "saml_metadata_url"
 
-    def test_conf_enable_key(self):
-        assert CONF_ENABLE_KEY == "enable"
+    def test_conf_avp_policy_store_id_key(self):
+        assert CONF_AVP_POLICY_STORE_ID_KEY == "avp_policy_store_id"
diff --git a/tests/providers/amazon/aws/auth_manager/test_user.py 
b/tests/providers/amazon/aws/auth_manager/test_user.py
index b518c2a831..6b5bb6da80 100644
--- a/tests/providers/amazon/aws/auth_manager/test_user.py
+++ b/tests/providers/amazon/aws/auth_manager/test_user.py
@@ -41,3 +41,6 @@ class TestAwsAuthManagerUser:
     def test_get_name_with_user_id(self, user):
         user.user_id = "user_id"
         assert user.get_name() == "user_id"
+
+    def test_get_groups(self, user):
+        assert user.get_groups() == []
diff --git a/airflow/providers/amazon/aws/auth_manager/constants.py 
b/tests/providers/amazon/aws/hooks/test_verified_permissions.py
similarity index 77%
copy from airflow/providers/amazon/aws/auth_manager/constants.py
copy to tests/providers/amazon/aws/hooks/test_verified_permissions.py
index f2f9c1da07..e7594dc3a5 100644
--- a/airflow/providers/amazon/aws/auth_manager/constants.py
+++ b/tests/providers/amazon/aws/hooks/test_verified_permissions.py
@@ -14,10 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-# Configuration keys
 from __future__ import annotations
 
-CONF_SECTION_NAME = "aws_auth_manager"
-CONF_SAML_METADATA_URL_KEY = "saml_metadata_url"
-CONF_ENABLE_KEY = "enable"
+from airflow.providers.amazon.aws.hooks.verified_permissions import 
VerifiedPermissionsHook
+
+
+class TestVerifiedPermissionsHook:
+    def test_conn_attribute(self):
+        hook = VerifiedPermissionsHook()
+        assert hasattr(hook, "conn")


Reply via email to