Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-25 Thread via GitHub


jroachgolf84 closed pull request #55216: Adding `TriggerWatermarks` model to 
Airflow
URL: https://github.com/apache/airflow/pull/55216


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-04 Thread via GitHub


gyli commented on code in PR #55216:
URL: https://github.com/apache/airflow/pull/55216#discussion_r2320764365


##
airflow-core/src/airflow/models/trigger.py:
##
@@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, 
alive_triggerer_ids: list[int] | Sel
 return result
 
 
+class TriggerWatermark(Base):
+
+__tablename__ = "trigger_watermark"
+
+id = Column(Integer, primary_key=True, autoincrement=True)
+
+# Leverage the
+trigger_hash = Column(
+String(length=1500).with_variant(
+String(
+length=1500,
+# latin1 allows for more indexed length in mysql
+# and this field should only be ascii chars
+collation="latin1_general_cs",
+),
+"mysql",
+),
+nullable=False,
+)

Review Comment:
   I think it's using existing code 
[here](https://github.com/gyli/airflow/blob/main/airflow-core/src/airflow/models/asset.py#L156)
 as the example.
   
   However, unlike asset URI, I think we should allow UTF8 char for column 
`key` and `value`. Both the watermark name and value can be any string that 
goes beyond the latin1 range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-03 Thread via GitHub


jroachgolf84 commented on PR #55216:
URL: https://github.com/apache/airflow/pull/55216#issuecomment-3249631065

   A change like this is going to be needed in order for the Trigger to be able 
to read/write from the `TriggerWatermarks` model.
   
   https://github.com/apache/airflow/pull/53514/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-03 Thread via GitHub


ephraimbuddy commented on code in PR #55216:
URL: https://github.com/apache/airflow/pull/55216#discussion_r2319264680


##
airflow-core/src/airflow/models/trigger.py:
##
@@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, 
alive_triggerer_ids: list[int] | Sel
 return result
 
 
+class TriggerWatermark(Base):
+
+__tablename__ = "trigger_watermark"
+
+id = Column(Integer, primary_key=True, autoincrement=True)
+
+# Leverage the
+trigger_hash = Column(
+String(length=1500).with_variant(
+String(
+length=1500,
+# latin1 allows for more indexed length in mysql
+# and this field should only be ascii chars
+collation="latin1_general_cs",
+),
+"mysql",
+),
+nullable=False,
+)
+
+key = Column(
+String(length=1500).with_variant(
+String(
+length=1500,
+# latin1 allows for more indexed length in mysql
+# and this field should only be ascii chars
+collation="latin1_general_cs",
+),
+"mysql",
+),
+nullable=False,
+)
+
+# The value is stored as a string. We'll need to have some sort of a 
"serialization" method that
+# handles this
+value = Column(
+String(length=1500).with_variant(
+String(
+length=1500,
+# latin1 allows for more indexed length in mysql
+# and this field should only be ascii chars
+collation="latin1_general_cs",
+),
+"mysql",
+),
+nullable=False,
+)
+
+@classmethod
+@provide_session
+def set(
+cls,
+trigger_hash: str,
+key: str,
+value: Any = None,
+session: Session = NEW_SESSION,
+):
+if not trigger_hash:
+raise ValueError(
+"Invalid params. A ``trigger_hash`` of type ``str`` must be "
+"passed to ``set``. This must be done in the ``set_watermark`` 
"
+"method of your Trigger."
+)
+
+if not key:
+raise ValueError(
+"Invalid params. A ``key`` of type ``str`` must be "
+"passed to ``set``. This must be done in the ``set_watermark`` 
"
+"method of your Trigger."
+)
+
+# Query and delete the existing value
+trigger_watermark = session.query(cls).filter(

Review Comment:
   Use Sqlalchemy 2.0+ query style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-03 Thread via GitHub


ephraimbuddy commented on code in PR #55216:
URL: https://github.com/apache/airflow/pull/55216#discussion_r2319262432


##
airflow-core/src/airflow/models/trigger.py:
##
@@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, 
alive_triggerer_ids: list[int] | Sel
 return result
 
 
+class TriggerWatermark(Base):
+
+__tablename__ = "trigger_watermark"
+
+id = Column(Integer, primary_key=True, autoincrement=True)
+
+# Leverage the
+trigger_hash = Column(
+String(length=1500).with_variant(
+String(
+length=1500,
+# latin1 allows for more indexed length in mysql
+# and this field should only be ascii chars
+collation="latin1_general_cs",
+),
+"mysql",
+),
+nullable=False,
+)

Review Comment:
   Use `StringID` from airflow.models.base instead of handling collation here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-03 Thread via GitHub


jroachgolf84 commented on PR #55216:
URL: https://github.com/apache/airflow/pull/55216#issuecomment-3249242645

   Right now, using `@provide_session` seems to be throwing an exception (see 
below). Going to try and figure out the best way around this one!
   
   ```
   2025-09-03 13:16:28 [error] Trigger ID 28 exited with error: 
   Direct database access via the ORM is not allowed in Airflow 3.0 
   [airflow.jobs.triggerer_job_runner] 
   
   error_detail = [
 {
   'exc_type': 'RuntimeError',
   'exc_value': 'Direct database access via the ORM is not allowed in 
Airflow 3.0',
   'exc_notes': [],
   'syntax_error': None,
   'is_cause': False,
   'frames': [
 {
   'filename': 
'/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py',
   'lineno': 965,
   'name': 'cleanup_finished_triggers'
 },
 {
   'filename': 
'/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py',
   'lineno': 1074,
   'name': 'run_trigger'
 },
 {
   'filename': 
'/opt/airflow/providers/standard/src/airflow/providers/standard/triggers/dummy.py',
   'lineno': 29,
   'name': 'run'
 },
 {
   'filename': '/opt/airflow/airflow-core/src/airflow/triggers/base.py',
   'lineno': 183,
   'name': 'get_watermark'
 },
 {
   'filename': '/opt/airflow/airflow-core/src/airflow/utils/session.py',
   'lineno': 99,
   'name': 'wrapper'
 },
 {
   'filename': '/usr/local/lib/python3.10/contextlib.py',
   'lineno': 135,
   'name': '__enter__'
 },
 {
   'filename': '/opt/airflow/airflow-core/src/airflow/utils/session.py',
   'lineno': 40,
   'name': 'create_session'
 },
 {
   'filename': 
'/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py',
   'lineno': 260,
   'name': '__init__'
 }
   ],
   'is_group': False,
   'exceptions': []
 }
   ]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



[PR] Adding `TriggerWatermarks` model to Airflow [airflow]

2025-09-03 Thread via GitHub


jroachgolf84 opened a new pull request, #55216:
URL: https://github.com/apache/airflow/pull/55216

   This is a draft PR creating a `TriggerWatermarks` model to store watermarks. 
This is part of a larger effort (AIP-93) to unlock incremental event-based 
workloads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]