kaxil commented on code in PR #67718:
URL: https://github.com/apache/airflow/pull/67718#discussion_r3325544553


##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -161,6 +161,14 @@ def check_data_intervals(self):
         return self
 
     def validate_context(self, dag: SerializedDAG) -> dict:
+        if (
+            self.partition_key is not None
+            and not dag.timetable.partitioned

Review Comment:
   `dag.timetable.partitioned` isn't defined on every timetable. The core 
`Timetable` protocol (`timetables/base.py`) sets it, but the SDK 
`BaseTimetable` that user-defined timetables subclass 
(`task-sdk/.../sdk/bases/timetable.py`) only defines `partitioned_at_runtime`, 
not `partitioned`, and it's a standalone class, not a subclass of the core 
protocol. `decode_timetable` returns the user's own class for custom 
timetables, so a `partition_key` passed against a DAG with a custom timetable 
hits `AttributeError` here on the short-circuit. Since `trigger_dag_run` only 
catches `(ParamValidationError, ValueError)`, that escapes as a 500, the exact 
case this guard is meant to turn into a 400. `getattr(dag.timetable, 
"partitioned", False)` fixes it, or add `partitioned: bool = False` to 
`BaseTimetable` for symmetry with `partitioned_at_runtime`.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -2466,6 +2467,74 @@ def 
test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te
         run = session.scalars(select(DagRun).where(DagRun.run_id == 
run_id_without_logical_date)).one()
         assert run.dag_id == custom_dag_id
 
+    def 
test_should_respond_400_when_partition_key_given_for_non_partitioned_dag(self, 
test_client):

Review Comment:
   This uses `DAG1_ID`, which is `schedule=None` (`NullTimetable`, a core class 
with `.partitioned` defined), so it doesn't cover a custom timetable 
subclassing the SDK `BaseTimetable`, which is exactly the case that 
AttributeErrors on the guard above. Adding a custom-timetable case here would 
pin that path.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -161,6 +161,14 @@ def check_data_intervals(self):
         return self
 
     def validate_context(self, dag: SerializedDAG) -> dict:
+        if (
+            self.partition_key is not None
+            and not dag.timetable.partitioned
+            and not dag.timetable.partitioned_at_runtime
+        ):
+            raise ValueError(

Review Comment:
   This rejection only runs on the public REST trigger path. The Execution API 
route (`trigger_dag_run` in `execution_api/routes/dag_runs.py:144`) passes 
`partition_key` straight into `trigger_dag` then `create_dagrun` with no 
partitionability check, so a non-partitioned run can still be created with a 
key there, and with the new inheritance it then emits partitioned-looking 
AssetEvents. `TriggerDagRunOperator` doesn't expose `partition_key` today so it 
isn't reachable through the shipped operator, but the wire field is forwarded 
ungated. Putting the check in `create_dagrun`/`DagRun.__init__` would cover 
both entrypoints instead of just the request body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to