Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-11 Thread via GitHub


mobuchowski merged PR #45294:
URL: https://github.com/apache/airflow/pull/45294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-11 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1950561783


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -935,6 +937,7 @@ def supervise(
 Run a single task execution to completion.
 
 :param ti: The task instance to run.
+:param dr: Current DagRun of the task instance.

Review Comment:
   Fixed. Leftover from going few iterations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-10 Thread via GitHub


uranusjr commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1950226981


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -935,6 +937,7 @@ def supervise(
 Run a single task execution to completion.
 
 :param ti: The task instance to run.
+:param dr: Current DagRun of the task instance.

Review Comment:
   This does not exist in the actual args? (actually `dag_path` below also does 
not exist; itโ€™s `dag_rel_path` in the args)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-07 Thread via GitHub


mobuchowski commented on PR #45294:
URL: https://github.com/apache/airflow/pull/45294#issuecomment-2644101721

   Hey @vikramkoka - rebased this PR and fixed conflicts, it should be good to 
merge.
   
   Haven't started on followup to https://github.com/apache/airflow/pull/45732 
but will try to find some time next week. I don't think we need to hold up this 
PR for that, especially since rebasing 2000 lines PR on fast moving target 
takes significant amount of time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-07 Thread via GitHub


vikramkoka commented on PR #45294:
URL: https://github.com/apache/airflow/pull/45294#issuecomment-2643456814

   @mobuchowski 
   Hope all is well. Just checking in how this is coming along? 
   
   You had also mentioned that you would be raising another PR once 
https://github.com/apache/airflow/pull/45732 was merged. Since that is already 
merged, is that follow-up PR already in progress. 
   
   Thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-02-03 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1940171828


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -199,3 +203,6 @@ class TIRunContext(BaseModel):
 
 connections: Annotated[list[ConnectionResponse], 
Field(default_factory=list)]
 """Connections that can be accessed by the task instance."""
+
+start_date: Annotated[UtcDateTime, Field(title="Start Date")]
+"""Start date of the task instance."""

Review Comment:
   Put it into `StartupDetails`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-23 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1927120490


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -191,6 +192,9 @@ class TIRunContext(BaseModel):
 dag_run: DagRun
 """DAG run information for the task instance."""
 
+task_reschedule_count: Annotated[int, Field(default=0)]
+"""How many times the task has been rescheduled."""

Review Comment:
   Added tests that check if overtime mechanism works.
   
   Also, have the similar doc here: 
https://docs.google.com/document/d/1Vu1g6Xe0m-QVUZd6uWJ9StqaVF_aXR_Li8p71bZBbow/edit?tab=t.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-22 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1925685019


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 state=TerminalTIState.FAILED,
 end_date=datetime.now(tz=timezone.utc),
 )
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)
 # TODO: Run task failure callbacks here
 except BaseException:
 log.exception("Task failed with exception")
 # TODO: Run task failure callbacks here
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-22 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1925167489


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   I agree that this needs to change - but I would leave that for followup PR 
after the actual change in Airflow behavior will get merged 
https://github.com/apache/airflow/pull/45732



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


ashb commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1924049855


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 state=TerminalTIState.FAILED,
 end_date=datetime.now(tz=timezone.utc),
 )
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)
 # TODO: Run task failure callbacks here
 except BaseException:
 log.exception("Task failed with exception")
 # TODO: Run task failure callbacks here
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)

Review Comment:
   Yeah listeners, certainly post-execute ones, in finalize please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


ashb commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1924049855


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 state=TerminalTIState.FAILED,
 end_date=datetime.now(tz=timezone.utc),
 )
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)
 # TODO: Run task failure callbacks here
 except BaseException:
 log.exception("Task failed with exception")
 # TODO: Run task failure callbacks here
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)

Review Comment:
   Yeah listeners, certainly post-execute ones in finalize please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923976014


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -199,3 +203,6 @@ class TIRunContext(BaseModel):
 
 connections: Annotated[list[ConnectionResponse], 
Field(default_factory=list)]
 """Connections that can be accessed by the task instance."""
+
+start_date: Annotated[UtcDateTime, Field(title="Start Date")]
+"""Start date of the task instance."""

Review Comment:
   Yeah - this is on supervisor though, the start_date is not passed to the 
task runner. I can move it into `StartupDetails` though rather than roundtrip 
through API. Or (non-server) `TIRunContext`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923976014


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -199,3 +203,6 @@ class TIRunContext(BaseModel):
 
 connections: Annotated[list[ConnectionResponse], 
Field(default_factory=list)]
 """Connections that can be accessed by the task instance."""
+
+start_date: Annotated[UtcDateTime, Field(title="Start Date")]
+"""Start date of the task instance."""

Review Comment:
   Yeah - this is on supervisor though, the start_date is not passed to the 
task runner. I can move it into `StartupDetails` though rather than roundtrip 
through API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923337346


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   Correct me if I am wrong -- but the (`dag_id`, `clear_number` & 
`logical_date`) won't be unique anymore in AF 3.0 -- since `logical_date` would 
accept Null values, no? @uranusjr 
   
   
https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L117-L124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923337346


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   Correct me if I am wrong -- but the `dag_id`, `clear_number` & 
`logical_date` won't be unique anymore in AF 3.0 -- since `logical_date` would 
accept Null values, no? @uranusjr 
   
   
https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L117-L124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


uranusjr commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923251268


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   Conceptually (in OpenLineage) if you clear and rerun a DAG run, the two runs 
(before and after the clear) are treated as entirely different objects. I 
believe this is kind of what we want to do in Airflow in the long run (similar 
to how we added TaskInstanceHistory), but before that happens, OL needs 
clear_number to distinguish _logically_ different runs that reuse the same DR 
row and have the exact same identity otherwise (run_id, and even the UUID pk).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-21 Thread via GitHub


uranusjr commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923251268


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   Conceptually (in OpenLineage) if you clear and rerun a DAG run, the two runs 
(before and after the clear) are treated as entirely different objects. I 
beleive this is kind of what we want to do in Airflow in the long run (similar 
to how we added TaskInstanceHistory), but before that happens, OL needs 
clear_number to distinguish _logically_ different runs that have the same run 
ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923135933


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -191,6 +192,9 @@ class TIRunContext(BaseModel):
 dag_run: DagRun
 """DAG run information for the task instance."""
 
+task_reschedule_count: Annotated[int, Field(default=0)]
+"""How many times the task has been rescheduled."""

Review Comment:
   We also have `TASK_OVERTIME_THRESHOLD` in the Supervisor process, so 
wherever we hook the listeners, lets verify that it uses it so we don't have 
tasks "stuck" issue.
   
   
https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L573-L576
   
   
https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L686-L697



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923135933


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -191,6 +192,9 @@ class TIRunContext(BaseModel):
 dag_run: DagRun
 """DAG run information for the task instance."""
 
+task_reschedule_count: Annotated[int, Field(default=0)]
+"""How many times the task has been rescheduled."""

Review Comment:
   We also have `TASK_OVERTIME_THRESHOLD` in the Supervisor process, so 
wherever we hook the listeners, lets verify that it uses it so we don't have 
tasks "stuck" issue.
   
   
https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L573-L576



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923133875


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 state=TerminalTIState.FAILED,
 end_date=datetime.now(tz=timezone.utc),
 )
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)
 # TODO: Run task failure callbacks here
 except BaseException:
 log.exception("Task failed with exception")
 # TODO: Run task failure callbacks here
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)

Review Comment:
   iirc @ashb had Listeners in mind for the `finalize` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923132467


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -191,6 +192,9 @@ class TIRunContext(BaseModel):
 dag_run: DagRun
 """DAG run information for the task instance."""
 
+task_reschedule_count: Annotated[int, Field(default=0)]
+"""How many times the task has been rescheduled."""

Review Comment:
   Do we have a brief doc listing where we plan to hook all the listeners?
   
   i.e. I am assuming you would want some information from the worker and some 
from the scheduler. I think worth you drafting it up and what info is required, 
and we can collectively agree on that, and once agreed, it should become easier 
to implement and merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923131084


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -199,3 +203,6 @@ class TIRunContext(BaseModel):
 
 connections: Annotated[list[ConnectionResponse], 
Field(default_factory=list)]
 """Connections that can be accessed by the task instance."""
+
+start_date: Annotated[UtcDateTime, Field(title="Start Date")]
+"""Start date of the task instance."""

Review Comment:
   This info is readily available here:
   
   
https://github.com/apache/airflow/blob/41b151e7dde473ec445f9f78fb4b8db826c368fc/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L599-L605
   
   I am wondering what would be the best place to hook the `on_start` listener.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923130647


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -535,10 +556,16 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 state=TerminalTIState.FAILED,
 end_date=datetime.now(tz=timezone.utc),
 )
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)
 # TODO: Run task failure callbacks here
 except BaseException:
 log.exception("Task failed with exception")
 # TODO: Run task failure callbacks here
+get_listener_manager().hook.on_task_instance_failed(
+previous_state=TaskInstanceState.RUNNING, task_instance=ti
+)

Review Comment:
   Should we move all the calls to `finalize` and we can change sig of finalize 
to `state, log` etc?
   
   That might be a better design than putting this on all `except` blocks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1923122101


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   I should have clarified better: I meant `clear_number` on the `DagRun` 
Runtime model feels odd i.e. it isn't required for it.
   
   Since `logical_date` can now be `None` too based on the link below, I think, 
you might need to refactor the logic for generating DR uuid.
   
   https://lists.apache.org/thread/cknldkl9pmmzr1q7ot67wborzznlwrtv
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922607800


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -165,6 +165,7 @@ class TaskInstance(BaseModel):
 try_number: int
 map_index: int = -1
 hostname: str | None = None
+start_date: UtcDateTime

Review Comment:
   Ok - moved it into context instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922607530


##
airflow/executors/workloads.py:
##
@@ -75,6 +77,15 @@ def key(self) -> TaskInstanceKey:
 )
 
 
+class DagRun(BaseModel):
+id: int
+dag_id: str
+run_id: str
+logical_date: datetime
+data_interval_start: datetime
+data_interval_end: datetime

Review Comment:
   This one isn't needed with getting DR from context, removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922581314


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   It is a part of the current DagRun model :) 
https://github.com/apache/airflow/blob/02d83b0768ff8ad3024b3a42cf8829789867861f/airflow/models/dagrun.py#L160
   
   We need it to properly generate DR uuid, so that events from different 
physical executions of a dag run aren't mixed up: 
https://github.com/apache/airflow/blob/90eae569db448bf24afd4b14505f055100c1193e/providers/src/airflow/providers/openlineage/plugins/adapter.py#L122



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922064634


##
airflow/executors/workloads.py:
##
@@ -75,6 +77,15 @@ def key(self) -> TaskInstanceKey:
 )
 
 
+class DagRun(BaseModel):
+id: int
+dag_id: str
+run_id: str
+logical_date: datetime
+data_interval_start: datetime
+data_interval_end: datetime

Review Comment:
   Looks like a lot of classes are duplicated across this file & 
`airflow/api_fastapi/execution_api/datamodels`.
   
   We should figure this out with our packaging discussion if this entire file 
is going to be moved to  separate package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922054772


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
 data_interval_end: UtcDateTime | None
 start_date: UtcDateTime
 end_date: UtcDateTime | None
+clear_number: int

Review Comment:
   it's odd to `clear_number` in the DagRun datamodel!
   
   What do we need this for? Can we get it from somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-20 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1922049056


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -165,6 +165,7 @@ class TaskInstance(BaseModel):
 try_number: int
 map_index: int = -1
 hostname: str | None = None
+start_date: UtcDateTime

Review Comment:
   You can get that from, the TaskInstance won't have that field and can't be 
mandatory.
   
   
https://github.com/apache/airflow/blob/ee785a89ba27a59246cdfcc0d83129b691a39f3e/airflow/api_fastapi/execution_api/datamodels/taskinstance.py#L36-L51



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-10 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383905


##
providers/tests/openlineage/plugins/test_listener.py:
##


Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-10 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383560


##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -87,28 +88,58 @@ def __init__(self):
 self.extractor_manager = ExtractorManager()
 self.adapter = OpenLineageAdapter()
 
-@hookimpl
-def on_task_instance_running(
-self,
-previous_state: TaskInstanceState,
-task_instance: TaskInstance,
-session: Session,  # This will always be QUEUED
-) -> None:
-if not getattr(task_instance, "task", None) is not None:
-self.log.warning(
-"No task set for TI object task_id: %s - dag_id: %s - run_id 
%s",
-task_instance.task_id,
-task_instance.dag_id,
-task_instance.run_id,
-)
-return
+if AIRFLOW_V_3_0_PLUS:
 
-self.log.debug("OpenLineage listener got notification about task 
instance start")
-dagrun = task_instance.dag_run
-task = task_instance.task
-if TYPE_CHECKING:
-assert task
-dag = task.dag
+@hookimpl
+def on_task_instance_running(
+self,
+previous_state: TaskInstanceState,
+task_instance: RuntimeTaskInstance,
+):
+if not getattr(task_instance, "task", None) is not None:
+self.log.warning(
+"No task set for TI object task_id: %s - dag_id: %s - 
run_id %s",
+task_instance.task_id,
+task_instance.dag_id,
+task_instance.run_id,
+)
+return

Review Comment:
   Actually nope - `RuntimeTaskInstance` prevents that. So, removed that check.



##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -127,35 +158,34 @@ def on_task_instance_running(
 return
 
 # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+data_interval_start = dagrun.data_interval_start
+if isinstance(data_interval_start, datetime):
+data_interval_start = data_interval_start.isoformat()
+data_interval_end = dagrun.data_interval_end
+if isinstance(data_interval_end, datetime):
+data_interval_end = data_interval_end.isoformat()
+
 debug_facet = get_airflow_debug_facet()
 
 @print_warning(self.log)
 def on_running():
-# that's a workaround to detect task running from deferred state
-# we return here because Airflow 2.3 needs task from deferred state
-if task_instance.next_method is not None:
-return
-
-if is_ti_rescheduled_already(task_instance):
+context = task_instance.get_template_context()
+if hasattr(context, "task_reschedule_count") and 
context["task_reschedule_count"] > 0:
 self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
 return
 
 parent_run_id = self.adapter.build_dag_run_id(
 dag_id=dag.dag_id,
 logical_date=dagrun.logical_date,
-clear_number=dagrun.clear_number,
+clear_number=0,

Review Comment:
   Fixed.



##
providers/tests/openlineage/extractors/test_manager.py:
##
@@ -324,3 +347,123 @@ def use_read():
 
 assert len(datasets.outputs) == 1
 assert datasets.outputs[0].asset == Asset(uri=path)
+
+
+@pytest.fixture
+def mock_supervisor_comms():
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as supervisor_comms:
+yield supervisor_comms
+
+
+@pytest.fixture
+def mocked_parse(spy_agency):
+"""
+Fixture to set up an inline DAG and use it in a stubbed `parse` function. 
Use this fixture if you
+want to isolate and test `parse` or `run` logic without having to define a 
DAG file.
+
+This fixture returns a helper function `set_dag` that:
+1. Creates an in line DAG with the given `dag_id` and `task` (limited to 
one task)
+2. Constructs a `RuntimeTaskInstance` based on the provided 
`StartupDetails` and task.
+3. Stubs the `parse` function using `spy_agency`, to return the mocked 
`RuntimeTaskInstance`.
+
+After adding the fixture in your test function signature, you can use it 
like this ::
+
+mocked_parse(
+StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="hello", 
dag_id="super_basic_run", run_id="c", try_number=1),
+file="",
+requests_fd=0,
+),
+"example_dag_id",
+CustomOperator(task_id="hello"),
+)
+"""
+
+def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) -> 
Ru

Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-10 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1910383193


##
providers/tests/openlineage/plugins/test_execution.py:
##
@@ -116,6 +116,7 @@ def test_not_stalled_task_emits_proper_lineage(self):
 self.setup_job(task_name, run_id)
 
 events = get_sorted_events(tmp_dir)
+log.error(events)

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-10 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1910371400


##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -127,35 +158,34 @@ def on_task_instance_running(
 return
 
 # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+data_interval_start = dagrun.data_interval_start
+if isinstance(data_interval_start, datetime):
+data_interval_start = data_interval_start.isoformat()
+data_interval_end = dagrun.data_interval_end
+if isinstance(data_interval_end, datetime):
+data_interval_end = data_interval_end.isoformat()
+
 debug_facet = get_airflow_debug_facet()
 
 @print_warning(self.log)
 def on_running():
-# that's a workaround to detect task running from deferred state
-# we return here because Airflow 2.3 needs task from deferred state
-if task_instance.next_method is not None:
-return
-
-if is_ti_rescheduled_already(task_instance):
+context = task_instance.get_template_context()
+if hasattr(context, "task_reschedule_count") and 
context["task_reschedule_count"] > 0:
 self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
 return

Review Comment:
   I think context will be eventually cached in Airflow 3 - it's essentially 
static dict at this point. Would leave optimizations for later ๐Ÿ™‚ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-10 Thread via GitHub


kacpermuda commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1909253802


##
providers/tests/openlineage/plugins/test_execution.py:
##
@@ -116,6 +116,7 @@ def test_not_stalled_task_emits_proper_lineage(self):
 self.setup_job(task_name, run_id)
 
 events = get_sorted_events(tmp_dir)
+log.error(events)

Review Comment:
   Some leftover from testing?



##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -127,35 +158,34 @@ def on_task_instance_running(
 return
 
 # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+data_interval_start = dagrun.data_interval_start
+if isinstance(data_interval_start, datetime):
+data_interval_start = data_interval_start.isoformat()
+data_interval_end = dagrun.data_interval_end
+if isinstance(data_interval_end, datetime):
+data_interval_end = data_interval_end.isoformat()
+
 debug_facet = get_airflow_debug_facet()
 
 @print_warning(self.log)
 def on_running():
-# that's a workaround to detect task running from deferred state
-# we return here because Airflow 2.3 needs task from deferred state
-if task_instance.next_method is not None:
-return
-
-if is_ti_rescheduled_already(task_instance):
+context = task_instance.get_template_context()
+if hasattr(context, "task_reschedule_count") and 
context["task_reschedule_count"] > 0:
 self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
 return
 
 parent_run_id = self.adapter.build_dag_run_id(
 dag_id=dag.dag_id,
 logical_date=dagrun.logical_date,
-clear_number=dagrun.clear_number,
+clear_number=0,

Review Comment:
   Why is it always set to 0 ?



##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -127,35 +158,34 @@ def on_task_instance_running(
 return
 
 # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+data_interval_start = dagrun.data_interval_start
+if isinstance(data_interval_start, datetime):
+data_interval_start = data_interval_start.isoformat()
+data_interval_end = dagrun.data_interval_end
+if isinstance(data_interval_end, datetime):
+data_interval_end = data_interval_end.isoformat()
+
 debug_facet = get_airflow_debug_facet()
 
 @print_warning(self.log)
 def on_running():
-# that's a workaround to detect task running from deferred state
-# we return here because Airflow 2.3 needs task from deferred state
-if task_instance.next_method is not None:
-return
-
-if is_ti_rescheduled_already(task_instance):
+context = task_instance.get_template_context()
+if hasattr(context, "task_reschedule_count") and 
context["task_reschedule_count"] > 0:
 self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
 return

Review Comment:
   Since it's skipping the entire event emission, maybe we can move it 
somewhere higher up? We already have the template context in 
`on_task_instance_running` for Airflow 3, we could use it there instead of 
re-creating it just for this purpose. Not sure how heavy is the operation of 
getting the template context.



##
providers/tests/openlineage/plugins/test_listener.py:
##


Review Comment:
   nit: There are places within this file where we condition the whole test 
class for Airflow 2 or 3 and then we still condition things on the 
AIRFLOW_V_3_0_PLUS variable within the test cases belonging to that class. We 
can probably remove those additional conditions if the whole test class will be 
skipped anyway.



##
providers/src/airflow/providers/openlineage/plugins/listener.py:
##
@@ -87,28 +88,58 @@ def __init__(self):
 self.extractor_manager = ExtractorManager()
 self.adapter = OpenLineageAdapter()
 
-@hookimpl
-def on_task_instance_running(
-self,
-previous_state: TaskInstanceState,
-task_instance: TaskInstance,
-session: Session,  # This will always be QUEUED
-) -> None:
-if not getattr(task_instance, "task", None) is not None:
-self.log.warning(
-"No task set for TI object task_id: %s - dag_id: %s - run_id 
%s",
-task_instance.task_id,
-task_instance.dag_id,
-task_instance.run_id,
-)
-return
+if AIRFLOW_V_3_0_PLUS:
 
-self.log.debug("OpenLineag

Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-08 Thread via GitHub


mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1907446623


##
airflow/api_fastapi/execution_api/datamodels/dagrun.py:
##
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This model is not used in the API, but it is included in generated OpenAPI 
schema
+# for use in the client SDKs.
+from __future__ import annotations
+
+from airflow.api_fastapi.common.types import UtcDateTime
+from airflow.api_fastapi.core_api.base import BaseModel
+
+
+class DagRun(BaseModel):
+"""Schema for TaskInstance model with minimal required fields needed for 
OL for now."""
+
+id: int
+dag_id: str
+run_id: str
+logical_date: UtcDateTime
+data_interval_start: UtcDateTime
+data_interval_end: UtcDateTime
+clear_number: int

Review Comment:
   Removed this one - it existed because DR wasn't used yet in API when I first 
created this. Now that it's used and OpenAPI generates nice there's no point of 
having it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible [airflow]

2025-01-08 Thread via GitHub


kaxil commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1907362185


##
airflow/api_fastapi/execution_api/datamodels/dagrun.py:
##
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This model is not used in the API, but it is included in generated OpenAPI 
schema
+# for use in the client SDKs.
+from __future__ import annotations
+
+from airflow.api_fastapi.common.types import UtcDateTime
+from airflow.api_fastapi.core_api.base import BaseModel
+
+
+class DagRun(BaseModel):
+"""Schema for TaskInstance model with minimal required fields needed for 
OL for now."""
+
+id: int
+dag_id: str
+run_id: str
+logical_date: UtcDateTime
+data_interval_start: UtcDateTime
+data_interval_end: UtcDateTime
+clear_number: int

Review Comment:
   We already have a DagRun model in 
`airflow/api_fastapi/execution_api/datamodels/taskinstance.py`.
   
   Let's move that here or nuke this one -- I am fine with either :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org