kaxil commented on code in PR #67718:
URL: https://github.com/apache/airflow/pull/67718#discussion_r3325544553
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -161,6 +161,14 @@ def check_data_intervals(self):
return self
def validate_context(self, dag: SerializedDAG) -> dict:
+ if (
+ self.partition_key is not None
+ and not dag.timetable.partitioned
Review Comment:
`dag.timetable.partitioned` isn't defined on every timetable. The core
`Timetable` protocol (`timetables/base.py`) sets it, but the SDK
`BaseTimetable` that user-defined timetables subclass
(`task-sdk/.../sdk/bases/timetable.py`) only defines `partitioned_at_runtime`,
not `partitioned`, and it's a standalone class, not a subclass of the core
protocol. `decode_timetable` returns the user's own class for custom
timetables, so a `partition_key` passed against a DAG with a custom timetable
hits `AttributeError` here on the short-circuit. Since `trigger_dag_run` only
catches `(ParamValidationError, ValueError)`, that escapes as a 500, the exact
case this guard is meant to turn into a 400. `getattr(dag.timetable,
"partitioned", False)` fixes it, or add `partitioned: bool = False` to
`BaseTimetable` for symmetry with `partitioned_at_runtime`.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -2466,6 +2467,74 @@ def
test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te
run = session.scalars(select(DagRun).where(DagRun.run_id ==
run_id_without_logical_date)).one()
assert run.dag_id == custom_dag_id
+ def
test_should_respond_400_when_partition_key_given_for_non_partitioned_dag(self,
test_client):
Review Comment:
This uses `DAG1_ID`, which is `schedule=None` (`NullTimetable`, a core class
with `.partitioned` defined), so it doesn't cover a custom timetable
subclassing the SDK `BaseTimetable`, which is exactly the case that
AttributeErrors on the guard above. Adding a custom-timetable case here would
pin that path.
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -161,6 +161,14 @@ def check_data_intervals(self):
return self
def validate_context(self, dag: SerializedDAG) -> dict:
+ if (
+ self.partition_key is not None
+ and not dag.timetable.partitioned
+ and not dag.timetable.partitioned_at_runtime
+ ):
+ raise ValueError(
Review Comment:
This rejection only runs on the public REST trigger path. The Execution API
route (`trigger_dag_run` in `execution_api/routes/dag_runs.py:144`) passes
`partition_key` straight into `trigger_dag` then `create_dagrun` with no
partitionability check, so a non-partitioned run can still be created with a
key there, and with the new inheritance it then emits partitioned-looking
AssetEvents. `TriggerDagRunOperator` doesn't expose `partition_key` today so it
isn't reachable through the shipped operator, but the wire field is forwarded
ungated. Putting the check in `create_dagrun`/`DagRun.__init__` would cover
both entrypoints instead of just the request body.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]