Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-23 Thread via GitHub


pierrejeambrun commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2559855816

   cc: @bbovenzi just merged. Hoping this can enable further development for 
the front end side. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-23 Thread via GitHub


pierrejeambrun merged PR #44332:
URL: https://github.com/apache/airflow/pull/44332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-23 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1895743176


##
airflow/api_fastapi/common/parameters.py:
##
@@ -601,3 +628,28 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryVariableKeyPatternSearch = Annotated[
 _SearchParam, Depends(search_param_factory(Variable.key, 
"variable_key_pattern"))
 ]
+
+
+# UI Shared
+def _optional_boolean(value: bool | None) -> bool | None:
+return value if value is not None else False
+
+
+QueryIncludeUpstream = Annotated[Union[bool, None], 
AfterValidator(_optional_boolean)]

Review Comment:
   You are right. It will default falls to `False`. I adjusted it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-23 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2559689679

   > Code and test cases look good, beside the couple of nits, ready to merge.
   
   Amazing news, just pushed the changes and resolved the threads. Thanks a lot 
for your detailed review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-22 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1895085064


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   Moved the `__or__` logic from `SortParam` to the view in a simple way. 
Thanks for the quick discussion! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-22 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894880241


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   maybe putting the logic into the view is simpler at this point. The general 
case (as we have in this PR) would need more effort to cover all cases (PK) and 
I don’t think it’s worth it, that’s the only filter with this requirement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-22 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894880241


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   maybe putting the logic into the view is simpler at this point. The general 
case (as we have in this PR) would need more effort to cover all cases (PK) and 
I don’t think it’s worth at this point, that’s the only filter with this 
requrement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-22 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894880241


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   maybe putting the logic into the view is simpler at this point. The general 
case (as we have in this PR) would need more effort to cover all cases (PK) and 
I don’t think it’s worth it, that’s the only filter with this requrement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-22 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894880241


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   maybe putting the logic into the view at this point is simpler at this 
point. The general case (as we have in this PR) would need more effort to cover 
all cases (PK) and I don’t think it’s worth at this point, that’s the only 
filter with this requrement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-21 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894664798


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   Yes, I am thinking the same when the primary key passes this will always 
fall to `other`. Maybe we can have an alternative approach similar to Search 
and Filter, creating the SortParam with the factory in this case. Do you mean 
something similar to this? Or just defining the `depend` method with a 
transform callback that the transform will be overridden in the view? 
   
   I am not sure if this is a needed feature for multiple use cases. This was 
the only logical comparison I ended up with without expanding SortParam 
initialisation. I am okay with both moving logic to view without expanding the 
SortParam or expanding the SortParam to conform to the primary key case.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-21 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894583867


##
airflow/api_fastapi/common/parameters.py:
##
@@ -163,6 +164,19 @@ def __init__(
 self.model = model
 self.to_replace = to_replace
 
+def __or__(self, other):

Review Comment:
   I see why we do that.
   
   The only thing is that If someone explicitly provide a filtering by 
“primary_key” this will end up falling back on the other, while it is not what 
the user want.
   
   
   Maybe we can go around it by using 1 SortParam with all values authorised. 
And put that custom logic in the transform_callback. Set the transform in the 
body of the view. (param.transform = lambda dag: dag….).
   
   And then use it.
   
   
   Just an idea though.
   
   i’ll do an in depth review this weekend but overall, it looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894580171


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,

Review Comment:
   Disregard my comment then. As we plan on removing that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2558002308

   I double-checked the parameters created with `filter_param_factory`, and 
they weren’t working as expected. 😕 To address this, I needed to adjust the 
`run_types` and `run_states` parameter names to `run_type` and `state`, 
respectively. It seems that `filter_param_factory` requires the parameter names 
to match the column names to function correctly. 
   
   Additionally, I have included the remaining two filter tests to ensure full 
coverage. With this update, all filters have been tested.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894483321


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,

Review Comment:
   I assume this will still be decided from `UI` always? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894482932


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+logical_date_gte: OptionalDateTimeQuery = None,
+logical_date_lte: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+date_filter = RangeFilter(
+Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte),
+attribute=DagRun.logical_date,
+)
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+DagRunNote.content.label("note"),

Review Comment:
   I have updated accordingly and updated the accessing of those elements from 
`execute` to `scalars`. This made me force to set the response object for each 
key rather than `**dag_run` but this is cleaner, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894482701


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+logical_date_gte: OptionalDateTimeQuery = None,
+logical_date_lte: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+date_filter = RangeFilter(
+Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte),
+attribute=DagRun.logical_date,
+)
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+DagRunNote.content.label("note"),
+)
+.join(DagRun.dag_run_note, isouter=True)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id)
+)
+
+dag_runs_select_filter, _ = paginated_select(
+statement=base_query,
+filters=[
+run_types,
+run_states,
+date_filter,
+],
+order_by=get_dag_run_sort_param(dag=dag, request_order_by=order_by),
+offset=offset,
+limit=limit,
+)
+
+dag_runs = session.execute(dag_runs_select_filter)
+
+# Check if there are any DAG Runs with given criteria to eliminate 
unnecessary queries/errors
+if not dag_runs:
+return GridResponse(dag_runs=[])
+
+# Retrieve, sort and encode the Task Instances
+tis_of_dag_runs, _ = paginated_select(
+statement=select(
+TaskInstance.run_id,
+TaskInstance.task_id,
+TaskInstance.try_number,
+TaskInstance.state,
+TaskInstance.start_date,
+Task

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894482263


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+logical_date_gte: OptionalDateTimeQuery = None,
+logical_date_lte: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+date_filter = RangeFilter(
+Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte),
+attribute=DagRun.logical_date,
+)
+# Retrieve, sort and encode the previous DAG Runs

Review Comment:
   Thanks, removed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894482182


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+:param request_order_by: Request Order By
+
+:return: Sort Param
+"""
+if request_order_by and request_order_by.value != 
request_order_by.get_primary_key_string():
+return request_order_by
+
+sort_param = SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun
+)
+
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return sort_param.set_value(name)
+
+return sort_param.set_value("logical_date")
+
+
+@cache
+def get_task_group_children_getter() -> operator.methodcaller:
+"""Get the Task Group Children Getter for the DAG."""
+sort_order = conf.get("webserver", "grid_view_sorting_order", 
fallback="topological")
+if sort_order == "topological":
+return operator.methodcaller("topological_sort")
+if sort_order == "hierarchical_alphabetical":
+return operator.methodcaller("hierarchical_alphabetical_sort")
+raise AirflowConfigException(f"Unsupported grid_view_sorting_order: 
{sort_order}")
+
+
+def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]:
+"""
+Get the Task Group Map for the DAG.
+
+:param dag: DAG
+
+:return: Task Group Map
+"""
+task_nodes: dict[str, dict[str, Any]] = {}
+
+def _is_task_node_mapped_task_group(task_node: BaseOperator | 
MappedTaskGroup | TaskMap | None) -> bool:
+"""Check if the Task Node is a Mapped Task Group."""
+return type(task_node) is MappedTaskGroup
+
+def _append_child_task_count_to_parent(
+child_task_count: int | MappedTaskGroup | TaskMap | MappedOperator | 
None,
+parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+):
+"""
+Append the Child Task Count to the Parent.
+
+This method should only be used for Mapped Models.
+"""
+if isinstance(parent_node, TaskGroup):
+# Remove the regular task counted in parent_node
+task_nodes[parent_node.node_id]["task_count"].append(-1)
+# Add the mapped task to the parent_node
+
task_nodes[parent_node.node_id]["task_count"].append(child_task_count)
+
+def _fill_task_group_map(
+task_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+):
+"""Recursively fill the Task Group Map."""
+if task_node is None:
+return
+if isinstance(task_node, MappedOperator):
+task_nodes[task_node.node_id] = {
+"is_group": False,
+"parent_id": parent_node.node_id if parent_node else None,
+   

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894481873


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:

Review Comment:
   Yes, this method is gone now. Sorry for the time-consuming comments!



##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,

Review Comment:
   Removed, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894481273


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+:param request_order_by: Request Order By
+
+:return: Sort Param
+"""
+if request_order_by and request_order_by.value != 
request_order_by.get_primary_key_string():

Review Comment:
   You are right, this made me realise if we want really compare if order_by 
(SortParam) is not defined is falls into the primary key. I included the `OR` 
method. I commented this in my main comment, we can continue from there :pray: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894480500


##
tests/api_fastapi/core_api/routes/ui/test_grid.py:
##
@@ -0,0 +1,1194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pendulum
+import pytest
+
+from airflow.decorators import task_group
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.task_group import TaskGroup
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+
+from tests_common.test_utils.db import clear_db_assets, clear_db_dags, 
clear_db_runs, clear_db_serialized_dags
+from tests_common.test_utils.mock_operators import MockOperator
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag"
+DAG_ID_2 = "test_dag_2"
+TASK_ID = "task"
+TASK_ID_2 = "task2"
+SUB_TASK_ID = "subtask"
+MAPPED_TASK_ID = "mapped_task"
+TASK_GROUP_ID = "task_group"
+INNER_TASK_GROUP = "inner_task_group"
+INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task"
+
+
+@pytest.fixture(autouse=True, scope="module")
+def examples_dag_bag():
+# Speed up: We don't want example dags for this module
+return DagBag(include_examples=False, read_dags_from_db=True)
+
+
+@pytest.fixture(autouse=True)
+@provide_session
+def setup(dag_maker, session=None):
+clear_db_runs()
+clear_db_dags()
+clear_db_serialized_dags()
+
+with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
+EmptyOperator(task_id=TASK_ID)
+
+@task_group
+def mapped_task_group(arg1):
+return MockOperator(task_id=SUB_TASK_ID, arg1=arg1)
+
+mapped_task_group.expand(arg1=["a", "b", "c"])
+with TaskGroup(group_id=TASK_GROUP_ID):
+MockOperator.partial(task_id=MAPPED_TASK_ID).expand(arg1=["a", 
"b", "c", "d"])
+with TaskGroup(group_id=INNER_TASK_GROUP):
+
MockOperator.partial(task_id=INNER_TASK_GROUP_SUB_TASK).expand(arg1=["a", "b"])
+
+triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
+logical_date = timezone.datetime(2024, 11, 30)
+
+data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+run_1 = dag_maker.create_dagrun(
+run_id="run_1",
+state=DagRunState.SUCCESS,
+run_type=DagRunType.SCHEDULED,
+logical_date=logical_date,
+data_interval=data_interval,
+**triggered_by_kwargs,
+)
+run_2 = dag_maker.create_dagrun(
+run_id="run_2",
+run_type=DagRunType.MANUAL,
+state=DagRunState.FAILED,
+logical_date=logical_date + timedelta(days=1),
+data_interval=data_interval,
+**triggered_by_kwargs,
+)
+for ti in run_1.task_instances:
+ti.state = TaskInstanceState.SUCCESS
+for ti in sorted(run_2.task_instances, key=lambda ti: (ti.task_id, 
ti.map_index)):
+if ti.task_id == TASK_ID:
+ti.state = TaskInstanceState.SUCCESS
+elif ti.task_id == "group.mapped":
+if ti.map_index == 0:
+ti.state = TaskInstanceState.SUCCESS
+ti.start_date = pendulum.DateTime(2024, 12, 30, 1, 0, 0, 
tzinfo=pendulum.UTC)
+ti.end_date = pendulum.DateTime(2024, 12, 30, 1, 2, 3, 
tzinfo=pendulum.UTC)
+elif ti.map_index == 1:
+ti.state = TaskInstanceState.RUNNING
+ti.start_date = pendulum.DateTime(2024, 12, 30, 2, 3, 4, 
tzinfo=pendulum.UTC)
+ti.end_date = None
+
+session.flush()
+
+with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session):
+EmptyOperator(task_id=TASK_ID_2)
+
+
+@pytest.fixture(autouse=True)
+def _clean():
+clear_db_runs()
+clear_db_assets()
+yield
+clear_db_runs()
+clear_db_assets()
+
+
+# Create this as a fixture so that it is applied before the `dag_with_runs` 
fixture is!
+@pytest.fixture(autouse=True)
+def _freeze_time_for_dagruns(time_machine):
+time_machine.move_to("2024-12-0

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1894480411


##
tests/api_fastapi/core_api/routes/ui/test_grid.py:
##


Review Comment:
   Included more test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-20 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2557854580

   >Can you check that limit is actually being respected? In my manual testing, 
I was trying to only fetch 14 runs but I always got the default of 100. 
@pierrejeambrun would you have any ideas here?
   
   I have checked and it seems like it is working fine. I have included a test 
case for it.
   
   >Can we add a test case for a triply nested dag group? Because it looks like 
we're only bubbling a failed state up twice.
   
   Indeed, this needs to be recalculated since the entire parent state is 
updated accordingly. I have included that small calculation to cascade this to 
all recursive parent groups. I have also included test cases for nested loops. 
I have tested locally with a case that is exactly similar to the picture you 
sent with 3 depth. 
   
   >As long as we have tests covering those edge cases we should be fine.
   
   I have included multiple test cases. I have also updated the code according 
to the comments. I will resolve them accordingly. 
   
   Only one specific thing to mention in general is I have included the 
`SortParam` `OR method`. Since the `value` won't be `null`, the only thing to 
compare is the generated way which falls to the primary key. This made me 
eliminate the unnecessary method and make or with `order_by | SortParam(, 
)`.  If you think this isn't generic enough to put on 
SortParam, I can move the OR logic to the grid method.
   
   ---
   
   Sorry for the delay! Many thanks for the tests and reviews! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-17 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2548912021

   Amazing, thanks both of you for the open-minded approach and constructive 
feedback! :)
   I really appreciate the great work done here. Handling recursive logic like 
this isn’t easy. This work would take much longer without that logic already in 
place. I am focusing on adding the necessary tests and checking the areas 
you’ve highlighted. The PR will be updated soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


bbovenzi commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2546737593

   Honestly, I don't think I did a great job with `dag_to_grid` so it can 
definitely be improved. Either way, its a lot of complicated recursive logic so 
let's make sure we have tests to help us out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1887573480


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,

Review Comment:
   Actually, I recommended we remove that config in 
https://github.com/apache/airflow/issues/43519
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2545873207

   > I found the legacy implementation more complex to understand
   
   I'm likely biased because i'm familiar with the old one. A fresh eye is 
always welcome and if you feel that way I'm sure others will find this new 
implementation easier to grasp, i'll just get used to it :).
   
   As long as we have tests covering those edge cases we should be fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886897530


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+:param request_order_by: Request Order By
+
+:return: Sort Param
+"""
+if request_order_by and request_order_by.value != 
request_order_by.get_primary_key_string():
+return request_order_by
+
+sort_param = SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun
+)
+
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return sort_param.set_value(name)
+
+return sort_param.set_value("logical_date")
+
+
+@cache
+def get_task_group_children_getter() -> operator.methodcaller:
+"""Get the Task Group Children Getter for the DAG."""
+sort_order = conf.get("webserver", "grid_view_sorting_order", 
fallback="topological")
+if sort_order == "topological":
+return operator.methodcaller("topological_sort")
+if sort_order == "hierarchical_alphabetical":
+return operator.methodcaller("hierarchical_alphabetical_sort")
+raise AirflowConfigException(f"Unsupported grid_view_sorting_order: 
{sort_order}")
+
+
+def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]:
+"""
+Get the Task Group Map for the DAG.
+
+:param dag: DAG
+
+:return: Task Group Map
+"""
+task_nodes: dict[str, dict[str, Any]] = {}
+
+def _is_task_node_mapped_task_group(task_node: BaseOperator | 
MappedTaskGroup | TaskMap | None) -> bool:
+"""Check if the Task Node is a Mapped Task Group."""
+return type(task_node) is MappedTaskGroup
+
+def _append_child_task_count_to_parent(
+child_task_count: int | MappedTaskGroup | TaskMap | MappedOperator | 
None,
+parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+):
+"""
+Append the Child Task Count to the Parent.
+
+This method should only be used for Mapped Models.
+"""
+if isinstance(parent_node, TaskGroup):
+# Remove the regular task counted in parent_node
+task_nodes[parent_node.node_id]["task_count"].append(-1)
+# Add the mapped task to the parent_node
+
task_nodes[parent_node.node_id]["task_count"].append(child_task_count)
+
+def _fill_task_group_map(
+task_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
+):
+"""Recursively fill the Task Group Map."""
+if task_node is None:
+return
+if isinstance(task_node, MappedOperator):
+task_nodes[task_node.node_id] = {
+"is_group": False,
+"parent_id": parent_node.node_id if parent_node else None,
+   

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2545723758

   >* Can you check that limit is actually being respected? In my manual 
testing, I was trying to only fetch 14 runs but I always got the default of 
100. @pierrejeambrun would you have any ideas here?
   >* Can we add a test case for a triply nested dag group? Because it looks 
like we're only bubbling a failed state up twice.
   
   Thanks for the comment! I will include test cases for more nested groups and 
test the limit accordingly. 
   
   >A few things to adjust before we can merge. Also some part of the logic are 
re-implementation (state propagation bug mentioned by Brent), I would suggest 
to try to keep things similar to the legacy implementation that has proven to 
be correct for many edge cases. Unless some stuff need improving of course, but 
the legacy code seems cleaner to me at this point. Maybe just because i'm more 
used to it.
   
   Thanks for the detailed review! 
   I found the legacy implementation more complex to understand, to be honest 
:sweat_smile: For example, I couldn't easily grasp these edge cases (mostly for 
`Mapped` objects) which didn't included in couple of iterations. We have 
covered most edge cases here too. Sorry if methods without comments made any 
confusion and took your time more while reviewing!
   I am not against converting the new logic to legacy implementation. Which 
area would you like to see the legacy implementation? I copied most of the 
logic from the legacy code to here other than building up the `task map` and 
decoupling the logic into multiple methods which was mostly populated under 
`dag_to_grid`. 
   
   I will check these comments in detail today and start making the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886819142


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,

Review Comment:
   I think we should default to `default_dag_run_display_number` from the 
config at some point for the limit ? (same as we did for num_runs in the legacy 
implementation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886777323


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+:param request_order_by: Request Order By
+
+:return: Sort Param
+"""
+if request_order_by and request_order_by.value != 
request_order_by.get_primary_key_string():

Review Comment:
   ```python
   if request_order_by
   ```
   
   How can this evaluate to false ? Is this check necessary ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886777323


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+:param request_order_by: Request Order By
+
+:return: Sort Param
+"""
+if request_order_by and request_order_by.value != 
request_order_by.get_primary_key_string():

Review Comment:
   ```python
   if request_order_by
   ```
   
   How can this evaluate to false ? Is this check necessary, type annotation 
mention `SortParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886776829


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:

Review Comment:
   This function needs refactoring I think. The logic is obfuscated, 'if no 
order_by param is specified then we default to the first field of 
`timetable.run_ordering`'. Or at least a comment, took me 5 minutes to 
understand what was going on here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886776829


##
airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam:

Review Comment:
   This function needs refactoring I think. The logic is obfuscated, 'if no 
order_by param is specified then we default to the first field of 
`timetable.run_ordering`'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-16 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1886595870


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+Range,
+RangeFilter,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_child_task_map,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+limit: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+logical_date_gte: OptionalDateTimeQuery = None,
+logical_date_lte: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+date_filter = RangeFilter(
+Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte),
+attribute=DagRun.logical_date,
+)
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+DagRunNote.content.label("note"),
+)
+.join(DagRun.dag_run_note, isouter=True)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id)
+)
+
+dag_runs_select_filter, _ = paginated_select(
+statement=base_query,
+filters=[
+run_types,
+run_states,
+date_filter,
+],
+order_by=get_dag_run_sort_param(dag=dag, request_order_by=order_by),
+offset=offset,
+limit=limit,
+)
+
+dag_runs = session.execute(dag_runs_select_filter)
+
+# Check if there are any DAG Runs with given criteria to eliminate 
unnecessary queries/errors
+if not dag_runs:
+return GridResponse(dag_runs=[])
+
+# Retrieve, sort and encode the Task Instances
+tis_of_dag_runs, _ = paginated_select(
+statement=select(
+TaskInstance.run_id,
+TaskInstance.task_id,
+TaskInstance.try_number,
+TaskInstance.state,
+TaskInstance.start_date,
+   

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-13 Thread via GitHub


bbovenzi commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2541816275

   Thanks so much. We are nearly there!
   
   1. Can you check that `limit` is actually being respected? In my manual 
testing, I was trying to only fetch 14 runs but I always got the default of 
100. @pierrejeambrun would you have any ideas here?
   
   2. Can we add a test case for a triply nested dag group? Because it looks 
like we're only bubbling a failed state up twice.
   
   Current UI:
   https://github.com/user-attachments/assets/ad7f79dd-3cf5-48e6-80b6-9e735ed67056";
 />
   
   Legacy UI:
   https://github.com/user-attachments/assets/0ba6b17b-0cbd-4417-97d5-ed7023acba6d";
 />
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2539952587

   I fixed the mypy check, I am surprised how my local pre-commit couldn't 
catch that. 
   Additionally, changed the `child_state`, I agree, it can reduce confusion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1882763751


##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None

Review Comment:
   Updated to TaskInstanceState



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2539839347

   >We only want to count its immediate children and their states. So 
section_2, should only have a count of 2 and states of failed: 1, success: 1
   
   I misunderstood your earlier message from this one related to the 
representation of nested task groups. I have now included smaller steps to 
ensure the correct calculation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2539838485

   >We only want to count its immediate children and their states. So 
section_2, should only have a count of 2 and states of failed: 1, success: 1
   
   I misunderstood your earlier message from this one related to the 
representation of nested task groups. I have now included smaller steps to 
ensure the correct calculation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2539838589

   >We only want to count its immediate children and their states. So 
section_2, should only have a count of 2 and states of failed: 1, success: 1
   
   I misunderstood your earlier message from this one related to the 
representation of nested task groups. I have now included smaller steps to 
ensure the correct calculation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1882748743


##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None

Review Comment:
   Missed this one pushing in a moment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1882748324


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -688,6 +688,47 @@ export type FastAPIAppResponse = {
   [key: string]: unknown | string;
 };
 
+/**
+ * DAG Run model for the Grid UI.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;

Review Comment:
   Updated using aliases, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1882746023


##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None
+note: str | None
+
+
+class GridDAGRunwithTIs(BaseModel):
+"""DAG Run model for the Grid UI."""
+
+run_id: str
+queued_at: datetime | None
+start_date: datetime | None
+end_date: datetime | None
+state: str
+run_type: str

Review Comment:
   Adjusted accordingly, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-12 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1882745657


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -688,6 +688,47 @@ export type FastAPIAppResponse = {
   [key: string]: unknown | string;
 };
 
+/**
+ * DAG Run model for the Grid UI.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  note: string | null;
+  task_instances: Array;
+};
+
+/**
+ * Response model for the Grid UI.
+ */
+export type GridResponse = {
+  dag_runs: Array;
+};
+
+/**
+ * Task Instance Summary model for the Grid UI.
+ */
+export type GridTaskInstanceSummary = {
+  task_id: string;
+  try_number: number;
+  start_date: string | null;
+  end_date: string | null;
+  queued_dttm: string | null;
+  states: {
+[key: string]: number;
+  } | null;
+  task_count: number;
+  overall_state: string | null;

Review Comment:
   Adjusted accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-11 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2536752788

   > This is coming along well! Thanks for all your patience as I test this 
endpoint out more and more.
   
   My pleasure! I am happy that we ensuring everything will run as expected 
before shipping this one. Thanks for having been testing multiple times 
throughout the PR! :) 
   I will take a look at these soon. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-10 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1878752046


##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None

Review Comment:
   ```suggestion
   overall_state: TaskInstanceState | None
   ```



##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None

Review Comment:
   ```suggestion
   state: TaskInstanceState | None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-10 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1878753011


##
airflow/api_fastapi/core_api/datamodels/ui/grid.py:
##
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class GridTaskInstanceSummary(BaseModel):
+"""Task Instance Summary model for the Grid UI."""
+
+task_id: str
+try_number: int
+start_date: datetime | None
+end_date: datetime | None
+queued_dttm: datetime | None
+states: dict[str, int] | None
+task_count: int
+overall_state: str | None
+note: str | None
+
+
+class GridDAGRunwithTIs(BaseModel):
+"""DAG Run model for the Grid UI."""
+
+run_id: str
+queued_at: datetime | None
+start_date: datetime | None
+end_date: datetime | None
+state: str
+run_type: str

Review Comment:
   ```suggestion
   state: DagRunState
   run_type: DagRunType
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-10 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1878570856


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -688,6 +688,47 @@ export type FastAPIAppResponse = {
   [key: string]: unknown | string;
 };
 
+/**
+ * DAG Run model for the Grid UI.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;

Review Comment:
   ```suggestion
 dag_run_id: string;
   ```
   
   Let's try to be more consistent with the DagRunResponse



##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -688,6 +688,47 @@ export type FastAPIAppResponse = {
   [key: string]: unknown | string;
 };
 
+/**
+ * DAG Run model for the Grid UI.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  note: string | null;
+  task_instances: Array;
+};
+
+/**
+ * Response model for the Grid UI.
+ */
+export type GridResponse = {
+  dag_runs: Array;
+};
+
+/**
+ * Task Instance Summary model for the Grid UI.
+ */
+export type GridTaskInstanceSummary = {
+  task_id: string;
+  try_number: number;
+  start_date: string | null;
+  end_date: string | null;
+  queued_dttm: string | null;
+  states: {
+[key: string]: number;
+  } | null;
+  task_count: number;
+  overall_state: string | null;

Review Comment:
   ```suggestion
 state: string | null;
   ```
   
   I was trying to use this endpoint locally and it is a lot easier if this is 
closer to `TaskInstanceResponse`
   
   Perhaps we should change `states` to be `child_states` to avoid confusion on 
variable names?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876902265


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,

Review Comment:
   Make sense! It can add more flexibility. I included as `Range` query which 
requires two input



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876901733


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,

Review Comment:
   For sure, adjusted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876872994


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,

Review Comment:
   Let's rename this to `limit` to be consistent with all other pagination



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2529694214

   > After more manual testing. It looks like we have a few issues.
   > 
   > Using the `example_task_group` Dag:
   > 
   > * `section_1` is correct if everything runs normally. But manually mark a 
child task as failed. Then the `states` dict is correct but overall_state was 
not updated
   > * `section_2` the `task_count` is correct at 2 if you assume its nested 
task group is a single task but the list of `states` shows 4 states which would 
be correct if you ignore task groups and only count the actual number of tasks 
no matter how nested they may be.
   
   Thanks a lot for the review and additional tests! 
   I have adjusted the code accordingly. Indeed it was missing recursive 
`task_groups` and `overall_state` calculation had problem with the order of the 
loops :sweat_smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876873780


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+order_by: Annotated[
+SortParam,
+Depends(
+SortParam(
+["logical_date", "data_interval_start", "data_interval_end", 
"start_date", "end_date"], DagRun
+).dynamic_depends()
+),
+],
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,

Review Comment:
   I think we should also change this to lte and gte datetime params like we do 
for other endpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876867323


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+DagRunNote.content.label("note"),
+)
+.join(DagRun.dag_run_note, isouter=True)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+)
+
+dag_runs_select_filter, _ = paginated_select(
+statement=base_query,
+filters=[
+run_types,
+run_states,
+],
+order_by=get_dag_run_sort_param(dag=dag),

Review Comment:
   Included as a parameter, if it isn't supplied default logic working, I have 
included a unit test for `order_by` too. Please let me know if this works 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876359629


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+DagRunNote.content.label("note"),
+)
+.join(DagRun.dag_run_note, isouter=True)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+)
+
+dag_runs_select_filter, _ = paginated_select(
+statement=base_query,
+filters=[
+run_types,
+run_states,
+],
+order_by=get_dag_run_sort_param(dag=dag),

Review Comment:
   Testing locally, this isn't getting the most recent first. And I think we 
need an `order_by` param for the user to customize.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876355991


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (

Review Comment:
   We need an order_by query for the dag runs. It should be the most recent 
start_date but some users may want to use data interval start or logical date 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-09 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1876355991


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryIncludeDownstream,
+QueryIncludeUpstream,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.grid import (
+fill_task_instance_summaries,
+get_dag_run_sort_param,
+get_task_group_map,
+)
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagrun import DagRunNote
+from airflow.models.taskinstance import TaskInstanceNote
+from airflow.utils import timezone
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+include_upstream: QueryIncludeUpstream = False,
+include_downstream: QueryIncludeDownstream = False,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+) -> GridResponse:
+"""Return grid data."""
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (

Review Comment:
   We need an order_by query for the dag runs. It should be the most recent 
start_date but some users may want to use data interval start or logical date 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-08 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1875054222


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   I have pushed some changes, but it’s essentially the same as `bool | None = 
False` :sweat_smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-08 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1875052154


##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##


Review Comment:
   Yeah, it should be at a higher level. My bad too! 😅 Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-08 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1875051563


##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+BaseParam,
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG) -> BaseParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+
+:return: Sort Param
+"""
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun

Review Comment:
   It is now created once and returns the `set_value` accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-08 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1875050610


##
airflow/api_fastapi/common/parameters.py:
##
@@ -112,6 +112,69 @@ def depends(self, only_active: bool = True) -> 
_OnlyActiveFilter:
 return self.set_value(only_active)
 
 
+class DagIdsFilter(BaseParam[list[str]]):
+"""Filter on dag ids."""
+
+def __init__(self, model: Base, value: list[str] | None = None, skip_none: 
bool = True) -> None:
+super().__init__(value, skip_none)
+self.model = model
+
+def to_orm(self, select: Select) -> Select:
+if self.value and self.skip_none:

Review Comment:
   I have updated it with `filter_param_factory`. Some of these methods wrongly 
spawned because I wrongly added them while rebasing, deleted them :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-08 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2526388783

   > Can you also plug the new common filters `include_upstream` and 
`include_downstream` in `structure_data` (`structure.py`)
   
   I already included this in the previous commit, updating that endpoint 
params according to changes on those command `upstream|downstream` variables :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-06 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1872765365


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   I have adjusted the other parts, I am going to include the tests with 
`include_upstream=True, include_downstream=False` and with 
`include_upstream=False, include_downstream=True` and try to push all the 
changes soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-06 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1872762494


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   I agree, this does indeed always return `false`, it slipped past me. It has 
never been an actual query for the DB calls. Instead, it’s used in the logic 
from the `dag.partial_subset` method, which doesn’t accept query.
   
   I think we can treat this as a simple `Annotated` parameter, similar to 
`DateTimeQuery`, but with a simple check to ensure it’s not passed with the 
request. While I know this won’t be an actual `query`, should we consider using 
DB calls to retrieve the `partial_subset` at that point? What do you think?
   
   ```
   def _optional_boolean(value: bool | None) -> bool | None:
   return value if value is not None else False
   
   QueryIncludeUpstream = Annotated[Union[bool, None], 
AfterValidator(_optional_boolean)]
   QueryIncludeDownstream = Annotated[Union[bool, None], 
AfterValidator(_optional_boolean)]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2519980071

   Can you also plug the new common filters `include_upstream` and 
`include_downstream` in `structure_data` (`structure.py`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1871162514


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   This makes me realize that we need a test case with for the `grid_data` with 
`include_upstream=True` `include_downstream=False`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1871155810


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   What is `Depends(lambda: False)` that part for ? I guess it is to have a 
common default to `False` ? But I think this will actually always return False. 
(there's no `Query()` involved)
   
   Usually we would do, this but we can't specify the default value.
   ```
   QueryIncludeUpstream = Annotated[bool, Query()]
   ```
   
   Maybe the easiest and more consistent way is to also use 
`filter_param_factory` ? (Default value is properly handled, Query, too). We 
just need to give `.value` to the function. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1871140549


##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+BaseParam,
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG) -> BaseParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+
+:return: Sort Param
+"""
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun

Review Comment:
   This could be factorized, and only the `set_value` could be in the `if/else` 
clause.
   
   ```SortParam(
   allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun
 )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1871140549


##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+BaseParam,
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG) -> BaseParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+
+:return: Sort Param
+"""
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun

Review Comment:
   This could be factorized, and only the `set_value` could be in the `if/else` 
clause.
   
   ```python
   SortParam(
   allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun
 )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-05 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1871155810


##
airflow/api_fastapi/common/parameters.py:
##
@@ -572,3 +636,7 @@ def _transform_ti_states(states: list[str] | None) -> 
list[TaskInstanceState | N
 QueryAssetDagIdPatternSearch = Annotated[
 _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
 ]
+
+# UI Shared
+QueryIncludeUpstream = Annotated[Union[bool, None], Depends(lambda: False)]
+QueryIncludeDownstream = Annotated[Union[bool, None], Depends(lambda: False)]

Review Comment:
   What is `Depends(lambda: False)` that part for ? I guess it is to have a 
common default to `False` ? But I think this will actually always return False.
   
   Usually we would do, this but we can't specify the default value.
   ```
   QueryIncludeUpstream = Annotated[bool, Query()]
   ```
   
   Maybe the easiest and more consistent way is to also use 
`filter_param_factory` ? (Default value is properly handled, Query, too). We 
just need to give `.value` to the function. 
   
   



##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import operator
+from functools import cache
+
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.parameters import (
+BaseParam,
+SortParam,
+)
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridTaskInstanceSummary,
+)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+
+def get_dag_run_sort_param(dag: DAG) -> BaseParam:
+"""
+Get the Sort Param for the DAG Run.
+
+Data interval columns are NULL for runs created before 2.3, but SQL's
+NULL-sorting logic would make those old runs always appear first. In a
+perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+not efficient, so instead if the run_ordering is data_interval_start or 
data_interval_end,
+we sort by logical_date instead.
+
+:param dag: DAG
+
+:return: Sort Param
+"""
+for name in dag.timetable.run_ordering:
+if name in ("data_interval_start", "data_interval_end"):
+return SortParam(
+allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun

Review Comment:
   This could be in common, and only the `set_value` could be in the `if/else` 
clause.
   
   ```SortParam(
   allowed_attrs=["logical_date", "data_interval_start", 
"data_interval_end"], model=DagRun
 )
   ```



##
airflow/api_fastapi/core_api/routes/ui/service/grid.py:
##


Review Comment:
   My bad for not being super explicit. It should be `services`.
   
   And it is another layer as `routes` (view, presentation), `datamodels` 
(serializers), `services` (business / common application logic)
   
   So I think `core_api/services/ui/graph.py` would be good.



##
airflow/api_fastapi/common/parameters.py:
##
@@ -112,6 +112,69 @@ def depends(self, only_active: bool = True) -> 
_OnlyActiveFilter:
 return self.set_value(only_active)
 
 
+class DagIdsFilter(BaseParam[list[str]]):
+"""Filter on dag ids."""
+
+def __init__(self, model: Base, value: list[str] | None = None, skip_none: 
bool = True) -> None:
+super().__init__(value, skip_none)
+self.model = model
+
+def to_orm(self, select: Select) -> Select:
+if self.value and self.skip_none:

Review Comment:
   Can't we use the `filter_param_factory` pattern for those filters ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubs

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870420114


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870420114


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870419878


##
tests/api_fastapi/core_api/routes/ui/test_grid.py:
##
@@ -0,0 +1,414 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pendulum
+import pytest
+
+from airflow.decorators import task_group
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.task_group import TaskGroup
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.db import clear_db_assets, clear_db_dags, 
clear_db_runs, clear_db_serialized_dags
+from tests_common.test_utils.mock_operators import MockOperator
+
+if AIRFLOW_V_3_0_PLUS:
+from airflow.utils.types import DagRunTriggeredByType
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag"
+DAG_ID_2 = "test_dag_2"
+TASK_ID = "task"
+TASK_ID_2 = "task2"
+
+
+@pytest.fixture(autouse=True, scope="module")
+def examples_dag_bag():
+# Speed up: We don't want example dags for this module
+return DagBag(include_examples=False, read_dags_from_db=True)
+
+
+@pytest.fixture(autouse=True)
+@provide_session
+def setup(dag_maker, session=None):
+clear_db_runs()
+clear_db_dags()
+clear_db_serialized_dags()
+
+with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
+EmptyOperator(task_id=TASK_ID)
+
+@task_group
+def mapped_task_group(arg1):
+return MockOperator(task_id="subtask", arg1=arg1)
+
+mapped_task_group.expand(arg1=["a", "b", "c"])
+with TaskGroup(group_id="task_group"):
+MockOperator.partial(task_id="mapped_task").expand(arg1=["a", "b", 
"c", "d"])
+
+triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+logical_date = timezone.datetime(2024, 11, 30)

Review Comment:
   Indeed, I missed it while including a fix for the caplog. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870419490


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870419045


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data

Review Comment:
   Removed indeed it is not 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870414431


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -683,6 +683,45 @@ export type GraphDataResponse = {
 
 export type arrange = "BT" | "LR" | "RL" | "TB";
 
+/**
+ * DAG Run model for the Grid UI with Task Instances.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  task_instances: Array | null;

Review Comment:
   Adjusted as it won't be `null` in any case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870418798


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -683,6 +683,45 @@ export type GraphDataResponse = {
 
 export type arrange = "BT" | "LR" | "RL" | "TB";
 
+/**
+ * DAG Run model for the Grid UI with Task Instances.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  task_instances: Array | null;
+};
+
+/**
+ * Response model for the Grid UI.
+ */
+export type GridResponse = {
+  dag_runs: Array;
+};

Review Comment:
   Removed and merged into one model



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870418616


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870413775


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870413033


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,
+) -> GridResponse:
+"""Return grid data."""
+## Database calls to retrieve the DAG Runs and Task Instances and validate 
the data
+dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+if not dag:
+raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+if root:
+dag = dag.partial_subset(
+task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
+)
+
+current_time = timezone.utcnow()
+# Retrieve, sort and encode the previous DAG Runs
+base_query = (
+select(
+DagRun.run_id,
+DagRun.queued_at,
+DagRun.start_date,
+DagRun.end_date,
+DagRun.state,
+DagRun.run_type,
+DagRun.data_interval_start,
+DagRun.data_interval_end,
+DagRun.dag_version_id.label("version_number"),
+)
+.select_from(DagRun)
+.where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= 
func.coalesce(base_date, current_time))
+.order_by(DagRun.id.desc())
+)
+
+def get_dag_run_sort_param():
+"""Get the Sort Param for the DAG Run."""
+
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+"""Get the Run Ordering Expression."""
+expr = DagRun.__mapper__.columns[name]
+# Data interval columns are NULL for runs created before 2.3, but 
SQL's
+# NULL-sorting logic would make those old runs always appear 
first. In a
+# perfect world we'd want to sort by ``get_run_data_interval()``, 
but that's
+# not efficient, so instead the columns are coalesced into 
logical_date,
+# which is good enough in most cases.
+if name in ("data_interval_start", "data_interval_end"):
+expr = func.coalesce(expr, DagRun.logical_date)
+return expr.desc()
+
+ordering_expression = (_get_run_ordering_expr(name) for name in 
dag.timetable.run_ordering)
+

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-04 Thread via GitHub


bugraoz93 commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1870412791


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,

Review Comment:
   Moved into parameters.py



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-03 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2515049790

   > Great work! This will power so much of the new UI!
   > 
   > In the old UI we also included the note for both dag runs and task 
instances. We should still include it for the new UI.
   
   I was writing my message and saw your review now :slightly_smiling_face:  
Thanks for the review, Brent! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-03 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2515046251

   > Great.
   > 
   > Looking good, just a few suggestions / improvement on the code, but 
nothing blocking.
   > 
   > I'll also let Brent double check that the interface corresponds to the UI 
specifications.
   
   That sounds amazing! Let me know if I have missed anything, and we can fix 
it at an early stage. 
   I will review the comments and adjust the code accordingly in the coming 
hours. Many thanks for the review and the comments! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-03 Thread via GitHub


bbovenzi commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1867988633


##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -683,6 +683,45 @@ export type GraphDataResponse = {
 
 export type arrange = "BT" | "LR" | "RL" | "TB";
 
+/**
+ * DAG Run model for the Grid UI with Task Instances.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  task_instances: Array | null;
+};
+
+/**
+ * Response model for the Grid UI.
+ */
+export type GridResponse = {
+  dag_runs: Array;
+};

Review Comment:
   Let's remove a nesting step if possible `export type GridResponse = 
Array`



##
airflow/ui/openapi-gen/requests/types.gen.ts:
##
@@ -683,6 +683,45 @@ export type GraphDataResponse = {
 
 export type arrange = "BT" | "LR" | "RL" | "TB";
 
+/**
+ * DAG Run model for the Grid UI with Task Instances.
+ */
+export type GridDAGRunwithTIs = {
+  run_id: string;
+  queued_at: string | null;
+  start_date: string | null;
+  end_date: string | null;
+  state: string;
+  run_type: string;
+  data_interval_start: string | null;
+  data_interval_end: string | null;
+  version_number: string | null;
+  task_instances: Array | null;

Review Comment:
   It would be great if this wasn't ever `null` and just be an empty array if 
there was no information



##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,

Review Comment:
   +1 on moving to a shared param since the grid and graph data endpoints will 
be used together in the UI so often.



##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. 

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-03 Thread via GitHub


pierrejeambrun commented on code in PR #44332:
URL: https://github.com/apache/airflow/pull/44332#discussion_r1867941494


##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.models import DagRun, MappedOperator, TaskInstance
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.taskmap import TaskMap
+from airflow.utils import timezone
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
+
+
+@grid_router.get(
+"/{dag_id}",
+include_in_schema=False,
+responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+)
+def grid_data(
+dag_id: str,
+run_types: QueryDagRunRunTypesFilter,
+run_states: QueryDagRunStateFilter,
+session: SessionDep,
+offset: QueryOffset,
+request: Request,
+num_runs: QueryLimit,
+base_date: OptionalDateTimeQuery = None,
+root: str | None = None,
+filter_upstream: bool = False,
+filter_downstream: bool = False,

Review Comment:
   I renamed those for the `graph_data` to `include_upstream`, and 
`include_downstream` which I think are more intuitive.
   
   Here `filter_upstream=False` will actually `include_upstream=False` => no 
upstream. (Where we asked for not filtering upstream).



##
airflow/api_fastapi/core_api/routes/ui/grid.py:
##
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections
+import itertools
+import operator
+from functools import cache
+
+from fastapi import HTTPException, Request, status
+from sqlalchemy import func, select
+from sqlalchemy.sql.operators import ColumnOperators
+from typing_extensions import Any
+
+from airflow import DAG
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import (
+OptionalDateTimeQuery,
+QueryDagRunRunTypesFilter,
+QueryDagRunStateFilter,
+QueryLimit,
+QueryOffset,
+SortParam,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.grid import (
+GridDAGRunwithTIs,
+GridResponse,
+GridTaskInstanceSummary,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_ex

Re: [PR] AIP-84 Migrate /object/grid_data from views to FastAPI [airflow]

2024-12-01 Thread via GitHub


bugraoz93 commented on PR #44332:
URL: https://github.com/apache/airflow/pull/44332#issuecomment-2509675556

   @pierrejeambrun this is ready for review. I removed the WIP earlier but 
forgot to ping you again :sweat_smile: Please when you have time, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org