Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski merged PR #45294: URL: https://github.com/apache/airflow/pull/45294 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1950561783 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -935,6 +937,7 @@ def supervise( Run a single task execution to completion. :param ti: The task instance to run. +:param dr: Current DagRun of the task instance. Review Comment: Fixed. Leftover from going few iterations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
uranusjr commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1950226981 ## task_sdk/src/airflow/sdk/execution_time/supervisor.py: ## @@ -935,6 +937,7 @@ def supervise( Run a single task execution to completion. :param ti: The task instance to run. +:param dr: Current DagRun of the task instance. Review Comment: This does not exist in the actual args? (actually `dag_path` below also does not exist; itโs `dag_rel_path` in the args) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on PR #45294: URL: https://github.com/apache/airflow/pull/45294#issuecomment-2644101721 Hey @vikramkoka - rebased this PR and fixed conflicts, it should be good to merge. Haven't started on followup to https://github.com/apache/airflow/pull/45732 but will try to find some time next week. I don't think we need to hold up this PR for that, especially since rebasing 2000 lines PR on fast moving target takes significant amount of time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
vikramkoka commented on PR #45294: URL: https://github.com/apache/airflow/pull/45294#issuecomment-2643456814 @mobuchowski Hope all is well. Just checking in how this is coming along? You had also mentioned that you would be raising another PR once https://github.com/apache/airflow/pull/45732 was merged. Since that is already merged, is that follow-up PR already in progress. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1940171828 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -199,3 +203,6 @@ class TIRunContext(BaseModel): connections: Annotated[list[ConnectionResponse], Field(default_factory=list)] """Connections that can be accessed by the task instance.""" + +start_date: Annotated[UtcDateTime, Field(title="Start Date")] +"""Start date of the task instance.""" Review Comment: Put it into `StartupDetails`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1927120490 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -191,6 +192,9 @@ class TIRunContext(BaseModel): dag_run: DagRun """DAG run information for the task instance.""" +task_reschedule_count: Annotated[int, Field(default=0)] +"""How many times the task has been rescheduled.""" Review Comment: Added tests that check if overtime mechanism works. Also, have the similar doc here: https://docs.google.com/document/d/1Vu1g6Xe0m-QVUZd6uWJ9StqaVF_aXR_Li8p71bZBbow/edit?tab=t.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1925685019 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger): state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc), ) +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) # TODO: Run task failure callbacks here except BaseException: log.exception("Task failed with exception") # TODO: Run task failure callbacks here +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1925167489 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: I agree that this needs to change - but I would leave that for followup PR after the actual change in Airflow behavior will get merged https://github.com/apache/airflow/pull/45732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
ashb commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1924049855 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger): state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc), ) +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) # TODO: Run task failure callbacks here except BaseException: log.exception("Task failed with exception") # TODO: Run task failure callbacks here +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) Review Comment: Yeah listeners, certainly post-execute ones, in finalize please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
ashb commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1924049855 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger): state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc), ) +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) # TODO: Run task failure callbacks here except BaseException: log.exception("Task failed with exception") # TODO: Run task failure callbacks here +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) Review Comment: Yeah listeners, certainly post-execute ones in finalize please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923976014 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -199,3 +203,6 @@ class TIRunContext(BaseModel): connections: Annotated[list[ConnectionResponse], Field(default_factory=list)] """Connections that can be accessed by the task instance.""" + +start_date: Annotated[UtcDateTime, Field(title="Start Date")] +"""Start date of the task instance.""" Review Comment: Yeah - this is on supervisor though, the start_date is not passed to the task runner. I can move it into `StartupDetails` though rather than roundtrip through API. Or (non-server) `TIRunContext` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923976014 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -199,3 +203,6 @@ class TIRunContext(BaseModel): connections: Annotated[list[ConnectionResponse], Field(default_factory=list)] """Connections that can be accessed by the task instance.""" + +start_date: Annotated[UtcDateTime, Field(title="Start Date")] +"""Start date of the task instance.""" Review Comment: Yeah - this is on supervisor though, the start_date is not passed to the task runner. I can move it into `StartupDetails` though rather than roundtrip through API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923337346 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: Correct me if I am wrong -- but the (`dag_id`, `clear_number` & `logical_date`) won't be unique anymore in AF 3.0 -- since `logical_date` would accept Null values, no? @uranusjr https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L117-L124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923337346 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: Correct me if I am wrong -- but the `dag_id`, `clear_number` & `logical_date` won't be unique anymore in AF 3.0 -- since `logical_date` would accept Null values, no? @uranusjr https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L117-L124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
uranusjr commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923251268 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: Conceptually (in OpenLineage) if you clear and rerun a DAG run, the two runs (before and after the clear) are treated as entirely different objects. I believe this is kind of what we want to do in Airflow in the long run (similar to how we added TaskInstanceHistory), but before that happens, OL needs clear_number to distinguish _logically_ different runs that reuse the same DR row and have the exact same identity otherwise (run_id, and even the UUID pk). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
uranusjr commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923251268 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: Conceptually (in OpenLineage) if you clear and rerun a DAG run, the two runs (before and after the clear) are treated as entirely different objects. I beleive this is kind of what we want to do in Airflow in the long run (similar to how we added TaskInstanceHistory), but before that happens, OL needs clear_number to distinguish _logically_ different runs that have the same run ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923135933 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -191,6 +192,9 @@ class TIRunContext(BaseModel): dag_run: DagRun """DAG run information for the task instance.""" +task_reschedule_count: Annotated[int, Field(default=0)] +"""How many times the task has been rescheduled.""" Review Comment: We also have `TASK_OVERTIME_THRESHOLD` in the Supervisor process, so wherever we hook the listeners, lets verify that it uses it so we don't have tasks "stuck" issue. https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L573-L576 https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L686-L697 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923135933 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -191,6 +192,9 @@ class TIRunContext(BaseModel): dag_run: DagRun """DAG run information for the task instance.""" +task_reschedule_count: Annotated[int, Field(default=0)] +"""How many times the task has been rescheduled.""" Review Comment: We also have `TASK_OVERTIME_THRESHOLD` in the Supervisor process, so wherever we hook the listeners, lets verify that it uses it so we don't have tasks "stuck" issue. https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L573-L576 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923133875 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger): state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc), ) +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) # TODO: Run task failure callbacks here except BaseException: log.exception("Task failed with exception") # TODO: Run task failure callbacks here +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) Review Comment: iirc @ashb had Listeners in mind for the `finalize` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923132467 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -191,6 +192,9 @@ class TIRunContext(BaseModel): dag_run: DagRun """DAG run information for the task instance.""" +task_reschedule_count: Annotated[int, Field(default=0)] +"""How many times the task has been rescheduled.""" Review Comment: Do we have a brief doc listing where we plan to hook all the listeners? i.e. I am assuming you would want some information from the worker and some from the scheduler. I think worth you drafting it up and what info is required, and we can collectively agree on that, and once agreed, it should become easier to implement and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923131084 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -199,3 +203,6 @@ class TIRunContext(BaseModel): connections: Annotated[list[ConnectionResponse], Field(default_factory=list)] """Connections that can be accessed by the task instance.""" + +start_date: Annotated[UtcDateTime, Field(title="Start Date")] +"""Start date of the task instance.""" Review Comment: This info is readily available here: https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L599-L605 I am wondering what would be the best place to hook the `on_start` listener. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923130647 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger): state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc), ) +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) # TODO: Run task failure callbacks here except BaseException: log.exception("Task failed with exception") # TODO: Run task failure callbacks here +get_listener_manager().hook.on_task_instance_failed( +previous_state=TaskInstanceState.RUNNING, task_instance=ti +) Review Comment: Should we move all the calls to `finalize` and we can change sig of finalize to `state, log` etc? That might be a better design than putting this on all `except` blocks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1923122101 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: I should have clarified better: I meant `clear_number` on the `DagRun` Runtime model feels odd i.e. it isn't required for it. Since `logical_date` can now be `None` too based on the link below, I think, you might need to refactor the logic for generating DR uuid. https://lists.apache.org/thread/cknldkl9pmmzr1q7ot67wborzznlwrtv -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922607800 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -165,6 +165,7 @@ class TaskInstance(BaseModel): try_number: int map_index: int = -1 hostname: str | None = None +start_date: UtcDateTime Review Comment: Ok - moved it into context instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922607530 ## airflow/executors/workloads.py: ## @@ -75,6 +77,15 @@ def key(self) -> TaskInstanceKey: ) +class DagRun(BaseModel): +id: int +dag_id: str +run_id: str +logical_date: datetime +data_interval_start: datetime +data_interval_end: datetime Review Comment: This one isn't needed with getting DR from context, removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922581314 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: It is a part of the current DagRun model :) https://github.com/apache/airflow/blob/02d83b0768ff8ad3024b3a42cf8829789867861f/airflow/models/dagrun.py#L160 We need it to properly generate DR uuid, so that events from different physical executions of a dag run aren't mixed up: https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922064634 ## airflow/executors/workloads.py: ## @@ -75,6 +77,15 @@ def key(self) -> TaskInstanceKey: ) +class DagRun(BaseModel): +id: int +dag_id: str +run_id: str +logical_date: datetime +data_interval_start: datetime +data_interval_end: datetime Review Comment: Looks like a lot of classes are duplicated across this file & `airflow/api_fastapi/execution_api/datamodels`. We should figure this out with our packaging discussion if this entire file is going to be moved to separate package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922054772 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -181,6 +182,7 @@ class DagRun(BaseModel): data_interval_end: UtcDateTime | None start_date: UtcDateTime end_date: UtcDateTime | None +clear_number: int Review Comment: it's odd to `clear_number` in the DagRun datamodel! What do we need this for? Can we get it from somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1922049056 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -165,6 +165,7 @@ class TaskInstance(BaseModel): try_number: int map_index: int = -1 hostname: str | None = None +start_date: UtcDateTime Review Comment: You can get that from, the TaskInstance won't have that field and can't be mandatory. https://github.com/apache/airflow/blob/ee785a89ba27a59246cdfcc0d83129b691a39f3e/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L36-L51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383905 ## providers/tests/openlineage/plugins/test_listener.py: ## Review Comment: Fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383560 ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -87,28 +88,58 @@ def __init__(self): self.extractor_manager = ExtractorManager() self.adapter = OpenLineageAdapter() -@hookimpl -def on_task_instance_running( -self, -previous_state: TaskInstanceState, -task_instance: TaskInstance, -session: Session, # This will always be QUEUED -) -> None: -if not getattr(task_instance, "task", None) is not None: -self.log.warning( -"No task set for TI object task_id: %s - dag_id: %s - run_id %s", -task_instance.task_id, -task_instance.dag_id, -task_instance.run_id, -) -return +if AIRFLOW_V_3_0_PLUS: -self.log.debug("OpenLineage listener got notification about task instance start") -dagrun = task_instance.dag_run -task = task_instance.task -if TYPE_CHECKING: -assert task -dag = task.dag +@hookimpl +def on_task_instance_running( +self, +previous_state: TaskInstanceState, +task_instance: RuntimeTaskInstance, +): +if not getattr(task_instance, "task", None) is not None: +self.log.warning( +"No task set for TI object task_id: %s - dag_id: %s - run_id %s", +task_instance.task_id, +task_instance.dag_id, +task_instance.run_id, +) +return Review Comment: Actually nope - `RuntimeTaskInstance` prevents that. So, removed that check. ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -127,35 +158,34 @@ def on_task_instance_running( return # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes +data_interval_start = dagrun.data_interval_start +if isinstance(data_interval_start, datetime): +data_interval_start = data_interval_start.isoformat() +data_interval_end = dagrun.data_interval_end +if isinstance(data_interval_end, datetime): +data_interval_end = data_interval_end.isoformat() + debug_facet = get_airflow_debug_facet() @print_warning(self.log) def on_running(): -# that's a workaround to detect task running from deferred state -# we return here because Airflow 2.3 needs task from deferred state -if task_instance.next_method is not None: -return - -if is_ti_rescheduled_already(task_instance): +context = task_instance.get_template_context() +if hasattr(context, "task_reschedule_count") and context["task_reschedule_count"] > 0: self.log.debug("Skipping this instance of rescheduled task - START event was emitted already") return parent_run_id = self.adapter.build_dag_run_id( dag_id=dag.dag_id, logical_date=dagrun.logical_date, -clear_number=dagrun.clear_number, +clear_number=0, Review Comment: Fixed. ## providers/tests/openlineage/extractors/test_manager.py: ## @@ -324,3 +347,123 @@ def use_read(): assert len(datasets.outputs) == 1 assert datasets.outputs[0].asset == Asset(uri=path) + + +@pytest.fixture +def mock_supervisor_comms(): +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as supervisor_comms: +yield supervisor_comms + + +@pytest.fixture +def mocked_parse(spy_agency): +""" +Fixture to set up an inline DAG and use it in a stubbed `parse` function. Use this fixture if you +want to isolate and test `parse` or `run` logic without having to define a DAG file. + +This fixture returns a helper function `set_dag` that: +1. Creates an in line DAG with the given `dag_id` and `task` (limited to one task) +2. Constructs a `RuntimeTaskInstance` based on the provided `StartupDetails` and task. +3. Stubs the `parse` function using `spy_agency`, to return the mocked `RuntimeTaskInstance`. + +After adding the fixture in your test function signature, you can use it like this :: + +mocked_parse( +StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="hello", dag_id="super_basic_run", run_id="c", try_number=1), +file="", +requests_fd=0, +), +"example_dag_id", +CustomOperator(task_id="hello"), +) +""" + +def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) -> Ru
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383193 ## providers/tests/openlineage/plugins/test_execution.py: ## @@ -116,6 +116,7 @@ def test_not_stalled_task_emits_proper_lineage(self): self.setup_job(task_name, run_id) events = get_sorted_events(tmp_dir) +log.error(events) Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1910371400 ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -127,35 +158,34 @@ def on_task_instance_running( return # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes +data_interval_start = dagrun.data_interval_start +if isinstance(data_interval_start, datetime): +data_interval_start = data_interval_start.isoformat() +data_interval_end = dagrun.data_interval_end +if isinstance(data_interval_end, datetime): +data_interval_end = data_interval_end.isoformat() + debug_facet = get_airflow_debug_facet() @print_warning(self.log) def on_running(): -# that's a workaround to detect task running from deferred state -# we return here because Airflow 2.3 needs task from deferred state -if task_instance.next_method is not None: -return - -if is_ti_rescheduled_already(task_instance): +context = task_instance.get_template_context() +if hasattr(context, "task_reschedule_count") and context["task_reschedule_count"] > 0: self.log.debug("Skipping this instance of rescheduled task - START event was emitted already") return Review Comment: I think context will be eventually cached in Airflow 3 - it's essentially static dict at this point. Would leave optimizations for later ๐ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kacpermuda commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1909253802 ## providers/tests/openlineage/plugins/test_execution.py: ## @@ -116,6 +116,7 @@ def test_not_stalled_task_emits_proper_lineage(self): self.setup_job(task_name, run_id) events = get_sorted_events(tmp_dir) +log.error(events) Review Comment: Some leftover from testing? ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -127,35 +158,34 @@ def on_task_instance_running( return # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes +data_interval_start = dagrun.data_interval_start +if isinstance(data_interval_start, datetime): +data_interval_start = data_interval_start.isoformat() +data_interval_end = dagrun.data_interval_end +if isinstance(data_interval_end, datetime): +data_interval_end = data_interval_end.isoformat() + debug_facet = get_airflow_debug_facet() @print_warning(self.log) def on_running(): -# that's a workaround to detect task running from deferred state -# we return here because Airflow 2.3 needs task from deferred state -if task_instance.next_method is not None: -return - -if is_ti_rescheduled_already(task_instance): +context = task_instance.get_template_context() +if hasattr(context, "task_reschedule_count") and context["task_reschedule_count"] > 0: self.log.debug("Skipping this instance of rescheduled task - START event was emitted already") return parent_run_id = self.adapter.build_dag_run_id( dag_id=dag.dag_id, logical_date=dagrun.logical_date, -clear_number=dagrun.clear_number, +clear_number=0, Review Comment: Why is it always set to 0 ? ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -127,35 +158,34 @@ def on_task_instance_running( return # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes +data_interval_start = dagrun.data_interval_start +if isinstance(data_interval_start, datetime): +data_interval_start = data_interval_start.isoformat() +data_interval_end = dagrun.data_interval_end +if isinstance(data_interval_end, datetime): +data_interval_end = data_interval_end.isoformat() + debug_facet = get_airflow_debug_facet() @print_warning(self.log) def on_running(): -# that's a workaround to detect task running from deferred state -# we return here because Airflow 2.3 needs task from deferred state -if task_instance.next_method is not None: -return - -if is_ti_rescheduled_already(task_instance): +context = task_instance.get_template_context() +if hasattr(context, "task_reschedule_count") and context["task_reschedule_count"] > 0: self.log.debug("Skipping this instance of rescheduled task - START event was emitted already") return Review Comment: Since it's skipping the entire event emission, maybe we can move it somewhere higher up? We already have the template context in `on_task_instance_running` for Airflow 3, we could use it there instead of re-creating it just for this purpose. Not sure how heavy is the operation of getting the template context. ## providers/tests/openlineage/plugins/test_listener.py: ## Review Comment: nit: There are places within this file where we condition the whole test class for Airflow 2 or 3 and then we still condition things on the AIRFLOW_V_3_0_PLUS variable within the test cases belonging to that class. We can probably remove those additional conditions if the whole test class will be skipped anyway. ## providers/src/airflow/providers/openlineage/plugins/listener.py: ## @@ -87,28 +88,58 @@ def __init__(self): self.extractor_manager = ExtractorManager() self.adapter = OpenLineageAdapter() -@hookimpl -def on_task_instance_running( -self, -previous_state: TaskInstanceState, -task_instance: TaskInstance, -session: Session, # This will always be QUEUED -) -> None: -if not getattr(task_instance, "task", None) is not None: -self.log.warning( -"No task set for TI object task_id: %s - dag_id: %s - run_id %s", -task_instance.task_id, -task_instance.dag_id, -task_instance.run_id, -) -return +if AIRFLOW_V_3_0_PLUS: -self.log.debug("OpenLineag
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
mobuchowski commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1907446623 ## airflow/api_fastapi/execution_api/datamodels/dagrun.py: ## @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This model is not used in the API, but it is included in generated OpenAPI schema +# for use in the client SDKs. +from __future__ import annotations + +from airflow.api_fastapi.common.types import UtcDateTime +from airflow.api_fastapi.core_api.base import BaseModel + + +class DagRun(BaseModel): +"""Schema for TaskInstance model with minimal required fields needed for OL for now.""" + +id: int +dag_id: str +run_id: str +logical_date: UtcDateTime +data_interval_start: UtcDateTime +data_interval_end: UtcDateTime +clear_number: int Review Comment: Removed this one - it existed because DR wasn't used yet in API when I first created this. Now that it's used and OpenAPI generates nice there's no point of having it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]
kaxil commented on code in PR #45294: URL: https://github.com/apache/airflow/pull/45294#discussion_r1907362185 ## airflow/api_fastapi/execution_api/datamodels/dagrun.py: ## @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This model is not used in the API, but it is included in generated OpenAPI schema +# for use in the client SDKs. +from __future__ import annotations + +from airflow.api_fastapi.common.types import UtcDateTime +from airflow.api_fastapi.core_api.base import BaseModel + + +class DagRun(BaseModel): +"""Schema for TaskInstance model with minimal required fields needed for OL for now.""" + +id: int +dag_id: str +run_id: str +logical_date: UtcDateTime +data_interval_start: UtcDateTime +data_interval_end: UtcDateTime +clear_number: int Review Comment: We already have a DagRun model in `airflow/api_fastapi/execution_api/datamodels/taskinstance.py`. Let's move that here or nuke this one -- I am fine with either :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org