carlinix opened a new pull request, #56910:
URL: https://github.com/apache/airflow/pull/56910
This PR fixes two inconsistencies between deferrable and non-deferrable
execution modes of the `S3KeySensor`:
1. Task context (`context`) is lost when resuming from the trigger.
2. `metadata_keys` is ignored — the deferrable trigger (`S3KeyTrigger`) only
returns object names (`list[str]`) instead of metadata dictionaries.
Both behaviors cause user-defined `check_fn` functions to fail or behave
inconsistently between execution modes.
### Reproduction
Use the following DAG:
```python
from __future__ import annotations
from datetime import datetime, UTC
from typing import Any
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sdk.execution_time.context import get_current_context
def ensure_datetime(value, fallback):
if not value:
return fallback
if isinstance(value, datetime):
return value
return datetime.fromisoformat(value)
def check_new_objects(files: list[dict[str, Any]], **context: Any) -> bool:
ti = context.get("ti") or get_current_context().get("ti")
extractor = ti.xcom_pull(task_ids="start_task", key="extractor") or {}
print(type(files), files)
print(type(context), context)
return True
check_s3 = S3KeySensor(
task_id="check_new_s3_objects",
bucket_name="example-bucket",
bucket_key="example-prefix/*",
aws_conn_id="aws_default",
wildcard_match=True,
metadata_keys=["Key", "LastModified", "Size", "ETag"],
poke_interval=30,
timeout=300,
deferrable=True,
check_fn=check_new_objects,
)
```
Observed logs:
```
<class 'list'> ['XYZ YTD Export.csv', 'XYZ_Weekly_Export.csv']
<class 'dict'> {}
```
* `files` is a list of strings, and `context` is empty.
If `deferrable=False`, both contain full metadata and context as expected.
### Root Cause
1. Context loss:
`S3KeySensor.execute_complete()` currently calls:
`found_keys = self.check_fn(event["files"])`
Instead of passing `context` like `_check_key()` does:
```python
if any(param.kind == inspect.Parameter.VAR_KEYWORD for param in
signature.parameters.values()):
return self.check_fn(files, **context)
```
2. Metadata loss:
`S3KeyTrigger` emits only:
`TriggerEvent({"status": "running", "files": keys})` where `keys` is a
`list[str]` from `get_files_async()`, ignoring `metadata_keys`.
### Fixes Introduced
- Fix 1: Pass task `context` properly to `check_fn` in `execute_complete()`
by replicating the signature inspection logic from `_check_key()`.
- Fix 2: Add `metadata_keys` support to `S3KeyTrigger`, allowing metadata
retrieval (list_objects_v2 or head_object) for deferrable mode.
### Expected Behavior
Both modes (`deferrable=True` and `deferrable=False`) now behave
consistently:
* `check_fn` always receives files as a list of metadata dictionaries.
* `context` always includes task instance and DAG context values (ti,
dag_run, etc.).
`metadata_keys=["*"]` returns full `head_object()` data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]