Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-13 Thread via GitHub


vincbeck merged PR #46391:
URL: https://github.com/apache/airflow/pull/46391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-12 Thread via GitHub


vincbeck commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1952931812


##
providers/standard/src/airflow/providers/standard/triggers/file.py:
##
@@ -75,3 +84,47 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
 yield TriggerEvent(True)
 return
 await asyncio.sleep(self.poke_interval)
+
+
+class FileDeleteTrigger(BaseEventTrigger):
+"""
+A trigger that fires exactly once after it finds the requested file and 
then delete the file.
+
+The difference between ``FileTrigger`` and ``FileDeleteTrigger`` is 
``FileDeleteTrigger`` can only find a
+specific file.
+
+:param filepath: File (relative to the base path set within the 
connection).
+:param poke_interval: Time that the job should wait in between each try
+"""
+
+def __init__(
+self,
+filepath: str,
+poke_interval: float = 5.0,
+**kwargs,
+):
+super().__init__()
+self.filepath = filepath
+self.poke_interval = poke_interval
+
+def serialize(self) -> tuple[str, dict[str, Any]]:
+"""Serialize FileDeleteTrigger arguments and classpath."""
+return (
+"airflow.providers.standard.triggers.file.FileDeleteTrigger",
+{
+"filepath": self.filepath,
+"poke_interval": self.poke_interval,
+},
+)
+
+async def run(self) -> AsyncIterator[TriggerEvent]:
+"""Loop until the relevant file is found."""
+while True:
+if os.path.isfile(self.filepath):
+mod_time_f = os.path.getmtime(self.filepath)
+mod_time = 
datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S")
+self.log.info("Found file %s last modified: %s", 
self.filepath, mod_time)
+os.remove(self.filepath)

Review Comment:
   Sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-12 Thread via GitHub


vincbeck commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1952929737


##
airflow/triggers/base.py:
##
@@ -51,7 +52,7 @@ class StartTriggerArgs:
 
 class BaseTrigger(abc.ABC, LoggingMixin):
 """
-Base class for all triggers.
+Base class for triggers used to defer tasks.

Review Comment:
   Good point. Answer is yes so I guess I can rephrase it. Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-12 Thread via GitHub


vincbeck commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1952928869


##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -52,8 +39,7 @@ def create_file():
 ):
 
 @task
-def delete_file():
-if os.path.exists(file_path):
-os.remove(file_path)  # Delete the file
+def test_task():
+pass

Review Comment:
   Sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-11 Thread via GitHub


Lee-W commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1951883198


##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -52,8 +39,7 @@ def create_file():
 ):
 
 @task
-def delete_file():
-if os.path.exists(file_path):
-os.remove(file_path)  # Delete the file
+def test_task():
+pass

Review Comment:
   nit: should we at least print or add a log to indicate the file has been 
deleted or something like that to make the example easier to understand



##
providers/standard/src/airflow/providers/standard/triggers/file.py:
##
@@ -75,3 +84,47 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
 yield TriggerEvent(True)
 return
 await asyncio.sleep(self.poke_interval)
+
+
+class FileDeleteTrigger(BaseEventTrigger):
+"""
+A trigger that fires exactly once after it finds the requested file and 
then delete the file.
+
+The difference between ``FileTrigger`` and ``FileDeleteTrigger`` is 
``FileDeleteTrigger`` can only find a
+specific file.
+
+:param filepath: File (relative to the base path set within the 
connection).
+:param poke_interval: Time that the job should wait in between each try
+"""
+
+def __init__(
+self,
+filepath: str,
+poke_interval: float = 5.0,
+**kwargs,
+):
+super().__init__()
+self.filepath = filepath
+self.poke_interval = poke_interval
+
+def serialize(self) -> tuple[str, dict[str, Any]]:
+"""Serialize FileDeleteTrigger arguments and classpath."""
+return (
+"airflow.providers.standard.triggers.file.FileDeleteTrigger",
+{
+"filepath": self.filepath,
+"poke_interval": self.poke_interval,
+},
+)
+
+async def run(self) -> AsyncIterator[TriggerEvent]:
+"""Loop until the relevant file is found."""
+while True:
+if os.path.isfile(self.filepath):
+mod_time_f = os.path.getmtime(self.filepath)
+mod_time = 
datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S")
+self.log.info("Found file %s last modified: %s", 
self.filepath, mod_time)
+os.remove(self.filepath)

Review Comment:
   maybe let's add a log after removal?



##
airflow/triggers/base.py:
##
@@ -51,7 +52,7 @@ class StartTriggerArgs:
 
 class BaseTrigger(abc.ABC, LoggingMixin):
 """
-Base class for all triggers.
+Base class for triggers used to defer tasks.

Review Comment:
   can BaseEventTrigger also be used to defer task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-10 Thread via GitHub


vincbeck commented on PR #46391:
URL: https://github.com/apache/airflow/pull/46391#issuecomment-2648201890

   Anyone? @ashb @Lee-W ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-05 Thread via GitHub


vincbeck commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1943416683


##
providers/standard/src/airflow/providers/standard/triggers/file.py:
##
@@ -75,3 +86,50 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
 yield TriggerEvent(True)
 return
 await asyncio.sleep(self.poke_interval)
+
+
+class FileDeleteTrigger(BaseEventTrigger):
+"""
+A trigger that fires exactly once after it finds the requested file and 
then delete the file.
+
+The difference between ``FileTrigger`` and ``FileDeleteTrigger`` is 
``FileDeleteTrigger`` can only find a
+specific file. When this file is found out, ``FileDeleteTrigger`` return 
the content of this file after
+deleting it.
+
+:param filepath: File (relative to the base path set within the 
connection).
+:param poke_interval: Time that the job should wait in between each try
+"""
+
+def __init__(
+self,
+filepath: str,
+poke_interval: float = 5.0,
+**kwargs,
+):
+super().__init__()
+self.filepath = filepath
+self.poke_interval = poke_interval
+
+def serialize(self) -> tuple[str, dict[str, Any]]:
+"""Serialize FileDeleteTrigger arguments and classpath."""
+return (
+"airflow.providers.standard.triggers.file.FileDeleteTrigger",
+{
+"filepath": self.filepath,
+"poke_interval": self.poke_interval,
+},
+)
+
+async def run(self) -> typing.AsyncIterator[TriggerEvent]:

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] AIP-82 Introduce `BaseEventTrigger` as base class for triggers used with event driven scheduling [airflow]

2025-02-04 Thread via GitHub


ashb commented on code in PR #46391:
URL: https://github.com/apache/airflow/pull/46391#discussion_r1942016022


##
providers/standard/src/airflow/providers/standard/triggers/file.py:
##
@@ -75,3 +86,50 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
 yield TriggerEvent(True)
 return
 await asyncio.sleep(self.poke_interval)
+
+
+class FileDeleteTrigger(BaseEventTrigger):
+"""
+A trigger that fires exactly once after it finds the requested file and 
then delete the file.
+
+The difference between ``FileTrigger`` and ``FileDeleteTrigger`` is 
``FileDeleteTrigger`` can only find a
+specific file. When this file is found out, ``FileDeleteTrigger`` return 
the content of this file after
+deleting it.
+
+:param filepath: File (relative to the base path set within the 
connection).
+:param poke_interval: Time that the job should wait in between each try
+"""
+
+def __init__(
+self,
+filepath: str,
+poke_interval: float = 5.0,
+**kwargs,
+):
+super().__init__()
+self.filepath = filepath
+self.poke_interval = poke_interval
+
+def serialize(self) -> tuple[str, dict[str, Any]]:
+"""Serialize FileDeleteTrigger arguments and classpath."""
+return (
+"airflow.providers.standard.triggers.file.FileDeleteTrigger",
+{
+"filepath": self.filepath,
+"poke_interval": self.poke_interval,
+},
+)
+
+async def run(self) -> typing.AsyncIterator[TriggerEvent]:

Review Comment:
   Nit: we should be using the `collections.abc` version of this, not the 
typing one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]