Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-04-05 Thread via GitHub


kaxil commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010704470


##
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py:
##
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Annotated
+from uuid import UUID
+
+from fastapi import HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.common.types import UtcDateTime
+from airflow.models.taskreschedule import TaskReschedule
+
+router = AirflowRouter(
+responses={
+status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
+status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
+},
+)
+
+
[email protected]("/{task_instance_id}/start_date")

Review Comment:
   I went through the same thought process.
   
   What's exposed is `task_reschedule_count` which isn't enough. I needed some 
way to know "retries" in the API server to get the `try_number` for Task 
Reschedule.
   
   The option I had considered were:
   
   1) Passing `retries` in `TIEnterRunningPayload`
   2) Get `retries` from `serialized_dag` table.
   3) Deferred way of getting this as it is only required for sensors. (the 
changes in the PR)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil commented on PR #48193:
URL: https://github.com/apache/airflow/pull/48193#issuecomment-2749142224

   The failures were unrelated to changes in this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil merged PR #48193:
URL: https://github.com/apache/airflow/pull/48193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010706386


##
tests/sdk/__init__.py:
##


Review Comment:
   Yeah, `create-missing-init-py-files-tests` pre-commit auto-added this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010732232


##
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -361,6 +363,29 @@ def get_relevant_upstream_map_indexes(
 # TODO: Implement this method
 return None
 
+def get_first_reschedule_date(self, try_number: int) -> datetime | None:

Review Comment:
   To get duration for `timeouts` and to get `next_poll_interval`
   
   
https://github.com/apache/airflow/blob/b8b6c594ba3fbd03b24c6e8df6c708c2901041d2/airflow-core/src/airflow/sensors/base.py#L205-L213
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


ashb commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010716417


##
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -361,6 +363,29 @@ def get_relevant_upstream_map_indexes(
 # TODO: Implement this method
 return None
 
+def get_first_reschedule_date(self, try_number: int) -> datetime | None:

Review Comment:
   When is this called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010709772


##
task-sdk/src/airflow/sdk/api/client.py:
##
@@ -454,6 +455,18 @@ def get_state(self, dag_id: str, run_id: str) -> 
DagRunStateResponse:
 return DagRunStateResponse.model_validate_json(resp.read())
 
 
+class TaskRescheduleOperations:

Review Comment:
   I considered too, but since I created a new API routes for Reschedule I went 
with this.. No strong reason either for that too.. So if you have a strong 
opinion have to change that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


kaxil commented on PR #48193:
URL: https://github.com/apache/airflow/pull/48193#issuecomment-2748992322

   In a follow-up PR, I will move the operator definition (and its tests) to 
task-sdk package.
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support Reschedule mode for `BaseSensorOperator` with Task SDK [airflow]

2025-03-24 Thread via GitHub


ashb commented on code in PR #48193:
URL: https://github.com/apache/airflow/pull/48193#discussion_r2010694944


##
task-sdk/src/airflow/sdk/api/client.py:
##
@@ -454,6 +455,18 @@ def get_state(self, dag_id: str, run_id: str) -> 
DagRunStateResponse:
 return DagRunStateResponse.model_validate_json(resp.read())
 
 
+class TaskRescheduleOperations:

Review Comment:
   I think this should be added to the existing TI Operations rather than 
create a whole new one



##
tests/sdk/__init__.py:
##


Review Comment:
   I don't think you meant to add these



##
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py:
##
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Annotated
+from uuid import UUID
+
+from fastapi import HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.common.types import UtcDateTime
+from airflow.models.taskreschedule import TaskReschedule
+
+router = AirflowRouter(
+responses={
+status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
+status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
+},
+)
+
+
[email protected]("/{task_instance_id}/start_date")

Review Comment:
   Isn't this sent in the run context already?
   
   Or could it be? I thought something like this was exposed in the context 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]