Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
jroachgolf84 closed pull request #55216: Adding `TriggerWatermarks` model to Airflow URL: https://github.com/apache/airflow/pull/55216 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
gyli commented on code in PR #55216: URL: https://github.com/apache/airflow/pull/55216#discussion_r2320764365 ## airflow-core/src/airflow/models/trigger.py: ## @@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Sel return result +class TriggerWatermark(Base): + +__tablename__ = "trigger_watermark" + +id = Column(Integer, primary_key=True, autoincrement=True) + +# Leverage the +trigger_hash = Column( +String(length=1500).with_variant( +String( +length=1500, +# latin1 allows for more indexed length in mysql +# and this field should only be ascii chars +collation="latin1_general_cs", +), +"mysql", +), +nullable=False, +) Review Comment: I think it's using existing code [here](https://github.com/gyli/airflow/blob/main/airflow-core/src/airflow/models/asset.py#L156) as the example. However, unlike asset URI, I think we should allow UTF8 char for column `key` and `value`. Both the watermark name and value can be any string that goes beyond the latin1 range. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
jroachgolf84 commented on PR #55216: URL: https://github.com/apache/airflow/pull/55216#issuecomment-3249631065 A change like this is going to be needed in order for the Trigger to be able to read/write from the `TriggerWatermarks` model. https://github.com/apache/airflow/pull/53514/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
ephraimbuddy commented on code in PR #55216: URL: https://github.com/apache/airflow/pull/55216#discussion_r2319264680 ## airflow-core/src/airflow/models/trigger.py: ## @@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Sel return result +class TriggerWatermark(Base): + +__tablename__ = "trigger_watermark" + +id = Column(Integer, primary_key=True, autoincrement=True) + +# Leverage the +trigger_hash = Column( +String(length=1500).with_variant( +String( +length=1500, +# latin1 allows for more indexed length in mysql +# and this field should only be ascii chars +collation="latin1_general_cs", +), +"mysql", +), +nullable=False, +) + +key = Column( +String(length=1500).with_variant( +String( +length=1500, +# latin1 allows for more indexed length in mysql +# and this field should only be ascii chars +collation="latin1_general_cs", +), +"mysql", +), +nullable=False, +) + +# The value is stored as a string. We'll need to have some sort of a "serialization" method that +# handles this +value = Column( +String(length=1500).with_variant( +String( +length=1500, +# latin1 allows for more indexed length in mysql +# and this field should only be ascii chars +collation="latin1_general_cs", +), +"mysql", +), +nullable=False, +) + +@classmethod +@provide_session +def set( +cls, +trigger_hash: str, +key: str, +value: Any = None, +session: Session = NEW_SESSION, +): +if not trigger_hash: +raise ValueError( +"Invalid params. A ``trigger_hash`` of type ``str`` must be " +"passed to ``set``. This must be done in the ``set_watermark`` " +"method of your Trigger." +) + +if not key: +raise ValueError( +"Invalid params. A ``key`` of type ``str`` must be " +"passed to ``set``. This must be done in the ``set_watermark`` " +"method of your Trigger." +) + +# Query and delete the existing value +trigger_watermark = session.query(cls).filter( Review Comment: Use Sqlalchemy 2.0+ query style -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
ephraimbuddy commented on code in PR #55216: URL: https://github.com/apache/airflow/pull/55216#discussion_r2319262432 ## airflow-core/src/airflow/models/trigger.py: ## @@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Sel return result +class TriggerWatermark(Base): + +__tablename__ = "trigger_watermark" + +id = Column(Integer, primary_key=True, autoincrement=True) + +# Leverage the +trigger_hash = Column( +String(length=1500).with_variant( +String( +length=1500, +# latin1 allows for more indexed length in mysql +# and this field should only be ascii chars +collation="latin1_general_cs", +), +"mysql", +), +nullable=False, +) Review Comment: Use `StringID` from airflow.models.base instead of handling collation here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Adding `TriggerWatermarks` model to Airflow [airflow]
jroachgolf84 commented on PR #55216:
URL: https://github.com/apache/airflow/pull/55216#issuecomment-3249242645
Right now, using `@provide_session` seems to be throwing an exception (see
below). Going to try and figure out the best way around this one!
```
2025-09-03 13:16:28 [error] Trigger ID 28 exited with error:
Direct database access via the ORM is not allowed in Airflow 3.0
[airflow.jobs.triggerer_job_runner]
error_detail = [
{
'exc_type': 'RuntimeError',
'exc_value': 'Direct database access via the ORM is not allowed in
Airflow 3.0',
'exc_notes': [],
'syntax_error': None,
'is_cause': False,
'frames': [
{
'filename':
'/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py',
'lineno': 965,
'name': 'cleanup_finished_triggers'
},
{
'filename':
'/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py',
'lineno': 1074,
'name': 'run_trigger'
},
{
'filename':
'/opt/airflow/providers/standard/src/airflow/providers/standard/triggers/dummy.py',
'lineno': 29,
'name': 'run'
},
{
'filename': '/opt/airflow/airflow-core/src/airflow/triggers/base.py',
'lineno': 183,
'name': 'get_watermark'
},
{
'filename': '/opt/airflow/airflow-core/src/airflow/utils/session.py',
'lineno': 99,
'name': 'wrapper'
},
{
'filename': '/usr/local/lib/python3.10/contextlib.py',
'lineno': 135,
'name': '__enter__'
},
{
'filename': '/opt/airflow/airflow-core/src/airflow/utils/session.py',
'lineno': 40,
'name': 'create_session'
},
{
'filename':
'/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py',
'lineno': 260,
'name': '__init__'
}
],
'is_group': False,
'exceptions': []
}
]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
[PR] Adding `TriggerWatermarks` model to Airflow [airflow]
jroachgolf84 opened a new pull request, #55216: URL: https://github.com/apache/airflow/pull/55216 This is a draft PR creating a `TriggerWatermarks` model to store watermarks. This is part of a larger effort (AIP-93) to unlock incremental event-based workloads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
