pierrejeambrun commented on code in PR #47062:
URL: https://github.com/apache/airflow/pull/47062#discussion_r1979424699
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -75,23 +83,63 @@ async def get_user_with_exception_handling(request:
Request) -> BaseUser | None:
return get_user(token_str)
-def requires_access_dag(method: ResourceMethod, access_entity: DagAccessEntity
| None = None) -> Callable:
+def requires_access_dag(
+ method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[Request, BaseUser | None], None]:
def inner(
- dag_id: str | None = None,
+ request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
- def callback():
- return get_auth_manager().is_authorized_dag(
- method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
- )
+ dag_id: str | None = request.path_params.get("dag_id")
_requires_access(
- is_authorized_callback=callback,
+ is_authorized_callback=lambda:
get_auth_manager().is_authorized_dag(
+ method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
+ )
)
return inner
+class PermittedDagFilter(BaseParam[set[str]]):
+ """A parameter that filters the permitted dags for the user."""
+
+ @classmethod
+ def depends(cls, *args: Any, **kwargs: Any) -> Self:
+ raise NotImplementedError("Use permitted_dag_filter_factory instead ,
depends is not implemented.")
+
+ def to_orm(self, select: Select) -> Select:
+ from airflow.models.dag import DagModel
Review Comment:
Not necessary. DagModel is already imported because we load the `dag_router`
and a lot of other places (param) where this is imported.
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -75,23 +83,63 @@ async def get_user_with_exception_handling(request:
Request) -> BaseUser | None:
return get_user(token_str)
-def requires_access_dag(method: ResourceMethod, access_entity: DagAccessEntity
| None = None) -> Callable:
+def requires_access_dag(
+ method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[Request, BaseUser | None], None]:
def inner(
- dag_id: str | None = None,
+ request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
- def callback():
- return get_auth_manager().is_authorized_dag(
- method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
- )
+ dag_id: str | None = request.path_params.get("dag_id")
_requires_access(
- is_authorized_callback=callback,
+ is_authorized_callback=lambda:
get_auth_manager().is_authorized_dag(
+ method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
+ )
)
return inner
+class PermittedDagFilter(BaseParam[set[str]]):
+ """A parameter that filters the permitted dags for the user."""
+
+ @classmethod
+ def depends(cls, *args: Any, **kwargs: Any) -> Self:
+ raise NotImplementedError("Use permitted_dag_filter_factory instead ,
depends is not implemented.")
+
+ def to_orm(self, select: Select) -> Select:
+ from airflow.models.dag import DagModel
+
+ # import here to avoid overhead of importing for other
`requires_access_*` functions
+ return select.where(DagModel.dag_id.in_(self.value))
+
+
+def permitted_dag_filter_factory(
+ methods: Container[ResourceMethod],
+) -> Callable[[Request, BaseUser], PermittedDagFilter]:
+ """
+ Create a callable for Depends in FastAPI that returns a filter of the
permitted dags for the user.
+
+ :param methods: The methods for which the dags are permitted.
+ :return: The callable that can be used as Depends in FastAPI.
+ """
+
+ def depends_permitted_dags_filter(
+ request: Request,
+ user: Annotated[BaseUser, Depends(get_user)],
Review Comment:
To replace (user)
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -75,23 +83,63 @@ async def get_user_with_exception_handling(request:
Request) -> BaseUser | None:
return get_user(token_str)
-def requires_access_dag(method: ResourceMethod, access_entity: DagAccessEntity
| None = None) -> Callable:
+def requires_access_dag(
+ method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[Request, BaseUser | None], None]:
def inner(
- dag_id: str | None = None,
+ request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
- def callback():
- return get_auth_manager().is_authorized_dag(
- method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
- )
+ dag_id: str | None = request.path_params.get("dag_id")
_requires_access(
- is_authorized_callback=callback,
+ is_authorized_callback=lambda:
get_auth_manager().is_authorized_dag(
+ method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
+ )
)
return inner
+class PermittedDagFilter(BaseParam[set[str]]):
+ """A parameter that filters the permitted dags for the user."""
+
+ @classmethod
+ def depends(cls, *args: Any, **kwargs: Any) -> Self:
+ raise NotImplementedError("Use permitted_dag_filter_factory instead ,
depends is not implemented.")
+
+ def to_orm(self, select: Select) -> Select:
+ from airflow.models.dag import DagModel
+
+ # import here to avoid overhead of importing for other
`requires_access_*` functions
+ return select.where(DagModel.dag_id.in_(self.value))
+
+
+def permitted_dag_filter_factory(
+ methods: Container[ResourceMethod],
+) -> Callable[[Request, BaseUser], PermittedDagFilter]:
+ """
+ Create a callable for Depends in FastAPI that returns a filter of the
permitted dags for the user.
+
+ :param methods: The methods for which the dags are permitted.
Review Comment:
> :param methods: The methods for which the dags are permitted.
Needs rephrasing I think.
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -75,23 +83,63 @@ async def get_user_with_exception_handling(request:
Request) -> BaseUser | None:
return get_user(token_str)
-def requires_access_dag(method: ResourceMethod, access_entity: DagAccessEntity
| None = None) -> Callable:
+def requires_access_dag(
+ method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[Request, BaseUser | None], None]:
def inner(
- dag_id: str | None = None,
+ request: Request,
Review Comment:
Replace user with `GetUserDep`.
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -75,23 +83,63 @@ async def get_user_with_exception_handling(request:
Request) -> BaseUser | None:
return get_user(token_str)
-def requires_access_dag(method: ResourceMethod, access_entity: DagAccessEntity
| None = None) -> Callable:
+def requires_access_dag(
+ method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[Request, BaseUser | None], None]:
def inner(
- dag_id: str | None = None,
+ request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
- def callback():
- return get_auth_manager().is_authorized_dag(
- method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
- )
+ dag_id: str | None = request.path_params.get("dag_id")
_requires_access(
- is_authorized_callback=callback,
+ is_authorized_callback=lambda:
get_auth_manager().is_authorized_dag(
+ method=method, access_entity=access_entity,
details=DagDetails(id=dag_id), user=user
+ )
)
return inner
+class PermittedDagFilter(BaseParam[set[str]]):
Review Comment:
Just an idea.
I don't think those should be a `BaseParam`. They are not query parameters
or path parameters.
Nonetheless it is important for them to implement the `to_orm` interface. We
could define a Protocol and both `BaseParam` and These permissions type could
implement. Protocol typing is a little weird, so maybe just a baseclass.
`HasMappingRelation`, `ORMClause` or any other name, this onees are not great,
that just needs a `to_orm` method and maybe a `self.value`.
And then `paginated_select` takes `Sequence[ORMClause]`.
This will allow to not have `class PermittedDagFilter(BaseParam[set[str]]):`
but just `class PermittedDagFilter(ORMClause[set[str]]):` and not confuse
reader that it is a query param or path parameter. And then `class
BaseParam(ORMClause, ABC):`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]