Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-12 Thread via GitHub


vatsrahul1001 merged PR #51592:
URL: https://github.com/apache/airflow/pull/51592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-12 Thread via GitHub


sunank200 commented on PR #51592:
URL: https://github.com/apache/airflow/pull/51592#issuecomment-2968889328

   > > Overall, looks good. Are other task handlers impacted by these typing 
changes? If so, probably can be addressed in a follow up as they'd be in 
providers...
   > 
   > IMO, we don't need to adapt TIH type hit for other provider-based task 
handlers, as most of them already adapted to RemoteIO interface ( which use 
`RuntimeTaskInstanceProtocol` as type annotation )
   
   I can do a follow-up PR if required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-12 Thread via GitHub


jason810496 commented on PR #51592:
URL: https://github.com/apache/airflow/pull/51592#issuecomment-2968719084

   > Overall, looks good. Are other task handlers impacted by these typing 
changes? If so, probably can be addressed in a follow up as they'd be in 
providers...
   
   IMO, we don't need to adapt TIH type hit for other provider-based task 
handlers, as most of them already adapted to RemoteIO interface ( which use 
`RuntimeTaskInstanceProtocol` as type annotation )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-11 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2140528710


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##
@@ -112,6 +112,7 @@ def get_log(
 TaskInstance.dag_id == dag_id,
 TaskInstance.run_id == dag_run_id,
 TaskInstance.map_index == map_index,
+TaskInstance.try_number == try_number,

Review Comment:
   Yup. That's the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-11 Thread via GitHub


jason810496 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2140366141


##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##
@@ -112,6 +112,7 @@ def get_log(
 TaskInstance.dag_id == dag_id,
 TaskInstance.run_id == dag_run_id,
 TaskInstance.map_index == map_index,
+TaskInstance.try_number == try_number,

Review Comment:
   Nice catch, I think this is the root cause of #50175 error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-11 Thread via GitHub


sunank200 commented on PR #51592:
URL: https://github.com/apache/airflow/pull/51592#issuecomment-2962195968

   > Related to #50175
   > 
   > The #50175 try to fix on FastAPI side instead of FileTaskHandler, but 
introduce other error ( [#50175 
(comment)](https://github.com/apache/airflow/pull/50175#issuecomment-2913881766)
 ).
   
   I have tested for the error mentioned in the issue. It works fine with this 
change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-11 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2139721714


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   I have removed `_ensure_ti` helper and accepted `TaskInstanceHistory` in log 
handlers and reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


jedcunningham commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138712529


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   So, looking at this a bit closer, I'm not sure we even need this function 
any longer. We will always get a ti or tih to this. And typing through the 
whole process is also wrong, e.g. 
[read_log_stream](https://github.com/apache/airflow/blob/7ea915494a0065fb486567744fd63d60b7586070/airflow-core/src/airflow/utils/log/log_reader.py#L73)
 is typed to take ti, but also takes tih in practice.
   
   Can you take a pass at cleaning this up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138649191


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` doesn't find the right task instance or task 
instance history - it’s just a normalisation step inside the `FileTaskHandler` 
so that, no matter whether you passed in:
   1. latest TaskInstance
   2. TaskInstanceKey
   3.. TaskInstanceHistory
   
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138649191


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` doesn't find the right task instance or task 
instance history - it’s just a normalisation step inside the `FileTaskHandler` 
so that, no matter whether you passed in:
   1. latest taskInstance
   2. taskInstanceKey
   3. taskInstanceHistory
   
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138649191


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` isn’t meant to be a “find me the right row” 
function for both tables - it’s just a normalisation step inside the 
`FileTaskHandler` so that, no matter whether you passed in:
   1. latest TaskInstance
   2. TaskInstanceKey
   3.. TaskInstanceHistory
   
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138649191


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` isn’t meant to be a “find me the right row” 
function for both tables - it’s just a normalisation step inside the 
`FileTaskHandler` so that, no matter whether you passed in:
   1. a latest TaskInstance
   2. a TaskInstanceKey
   3. a TaskInstanceHistory
   
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138672317


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   This code path ensures we only hit the TaskInstanceHistory table when 
there’s no latest TaskInstance for the requested try.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138651770


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   The actual fallback from latest TI → history TI lives up in the public-log 
route:
   - GET /logs first runs in 
https://github.com/apache/airflow/blob/67b722610559f0844cb11c7f8f9408cd4910/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py#L108
   - only if that returns None does it run 
   
https://github.com/apache/airflow/blob/67b722610559f0844cb11c7f8f9408cd4910/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py#L123
   
Once you’ve got either a latest TI or a TIHistory, you hand it to 
FileTaskHandler unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


sunank200 commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138649191


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` isn’t meant to be a “find me the right row” 
function for both tables - it’s just a normalisation step inside the 
`FileTaskHandler` so that, no matter whether you passed in:
   1. a live TaskInstance
   2. a TaskInstanceKey
   3. a TaskInstanceHistory
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   @jedcunningham  `_ensure_ti` isn’t meant to be a “find me the right row” 
function for both tables - it’s just a normalisation step inside the 
`FileTaskHandler` so that, no matter whether you passed in:
   1. a live TaskInstance
   2. a TaskInstanceKey
   3. a TaskInstanceHistory
   
   You end up with an object that has `.id`, `.try_number`, and 
`.get_dagrun()`, which is all _render_filename needs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


dstandish commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138485378


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   but yeah why is it setting try number 🤔 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


dstandish commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138483596


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   i suppose it depends on the context. in execution context, should be fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Support TaskInstanceHistory in log handlers [airflow]

2025-06-10 Thread via GitHub


jedcunningham commented on code in PR #51592:
URL: https://github.com/apache/airflow/pull/51592#discussion_r2138478586


##
airflow-core/src/airflow/utils/log/file_task_handler.py:
##
@@ -180,15 +181,18 @@ def _interleave_logs(*logs: str | LogMessages) -> 
Iterable[StructuredLogMessage]
 last = msg
 
 
-def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+def _ensure_ti(
+ti: TaskInstanceKey | TaskInstance | TaskInstanceHistory, session
+) -> TaskInstance | TaskInstanceHistory:
 """
-Given TI | TIKey, return a TI object.
+Return a TaskInstance or TaskInstanceHistory for the given TI, TIKey, or 
TaskInstanceHistory.
 
-Will raise exception if no TI is found in the database.
+Raises AirflowException if no TI is found in the database.
 """
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
-if isinstance(ti, TaskInstance):
+if isinstance(ti, (TaskInstance, TaskInstanceHistory)):
 return ti
 val = (

Review Comment:
   Is only querying TI the right behavior? Should this also fall back to 
history to get the right try? Just setting the try on the TI feels wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]