nathadfield commented on PR #67285: URL: https://github.com/apache/airflow/pull/67285#issuecomment-4678238173
@Lee-W rebased this onto main now that #68266 is in, and reworked it so it builds on your resolver instead of re-implementing the same thing. Quick rundown for when you get a chance to look again. The scheduler now leans entirely on your `_resolve_partition_date` / `to_partition_date` for temporal and composite mappers. I pulled out our old `_compute_target_partition_date` temporal and rollup branching and put `temporal.py` back to your version, so the `to_downstream_normalized` refactor is gone. What's left on our side is just the bit #68266 can't do: the IdentityMapper case. Its key can't be turned back into a date, so we carry the producer's date onto the APDR and the scheduler falls back to that when the resolver doesn't return one. Plus the exposure layer that was always the point of this PR: `Context["partition_date"]`, the execution-API `DagRun` field (Cadwyn-versioned), `DAGRunResponse`, OTel, and the docs. One thing I changed in your code that I want to call out: `_resolve_partition_date` now returns `(anchor, suppressed)` instead of just `datetime | None`. I needed to tell apart "no temporal mapper, so fall back to the identity carry" from "temporal mappers disagreed or one raised, so leave it unset." Without that distinction, an identity carry on the same APDR could quietly paper over a conflict you'd just logged. So `suppressed=True` on the conflict/error paths, and `(None, False)` when nothing temporal contributed. `test_resolve_partition_date` is updated to match. Very open to a different shape if you'd rather signal that another way. A couple of other things while I was in here. `_get_or_create_apdr` now drops the carried date to `None` when two events land on the same key with different dates, rather than letting arrival order pick the winner. And I fixed the docs that wrongly claimed `RollupMapper` and `ChainMapper` always leave `partition_date` as `None`, since they actually delegate through `to_partition_date` now (your earlier point about rollup). Last one is more of a question. `register_asset_change` picked up a `partition_date` kwarg, next to the existing `partition_key`. A custom `asset_manager_class` on the old signature without `**kwargs` would hit a `TypeError`. I left it as normal signature evolution rather than bolting on a shim for a single param, but shout if you'd want a compat note in the changelog. Thanks again for the steer on all this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
