paultmathew opened a new issue, #67224:
URL: https://github.com/apache/airflow/issues/67224

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes==10.14.0`. Same code shape on 
`main` (`10.16.1`), so the bug is current.
   
   ### Apache Airflow version
   
   3.2.0
   
   ### Operating System
   
   Linux (EKS)
   
   ### Deployment
   
   Other (custom)
   
   ### Deployment details
   
   Deferrable `KubernetesPodOperator` with `triggerer`. Issue is 
platform-independent.
   
   ### What happened?
   
   `KubernetesPodOperator(do_xcom_push=True, multiple_outputs=True, 
deferrable=True)` does not fan out the sidecar's `return.json` dict into 
per-key XComs. Only `return_value` is published. Downstream tasks that 
subscript a key (e.g. `trigger.output[\"export_arn\"]` resolving to 
`xcom_pull(key=\"export_arn\")`) silently get `None`.
   
   The same operator with `deferrable=False` works correctly — the dict is 
fanned out as `BaseOperator.multiple_outputs` documents.
   
   **Real production trace** (DAG with `trigger_export` → `wait_export`, both 
deferrable KPO, `trigger_export` has `multiple_outputs=True` and its sidecar 
writes `{\"export_arn\": \"arn:...\"}` to `/airflow/xcom/return.json`. 
`wait_export.arguments` includes `trigger.output[\"export_arn\"]`):
   
   After `trigger_export` succeeds, the wait pod renders with the 
resolved-to-None argument:
   
   \`\`\`
   WARNING - No XCom value found; defaulting to None.
             key=export_arn  task_id=trigger_export
   
   # Pod spec args (sanitized):
   'args': ['--namespace', '...', '--table', '...', '--export-arn', '']
   \`\`\`
   
   CLI in the pod then fails AWS validation:
   
   \`\`\`
   botocore.exceptions.ParamValidationError: Parameter validation failed:
   Invalid length for parameter ExportArn, value: 0, valid min length: 37
   \`\`\`
   
   The failure surfaces back via the deferrable re-entry path:
   
   \`\`\`
   File .../providers/cncf/kubernetes/operators/pod.py, line 937  in 
trigger_reentry
   File .../providers/cncf/kubernetes/operators/pod.py, line 1019 in _clean
   File .../providers/cncf/kubernetes/operators/pod.py, line 1053 in 
post_complete_action
   File .../providers/cncf/kubernetes/operators/pod.py, line 1100 in cleanup
   \`\`\`
   
   The sidecar exited cleanly (`exit_code: 0`) and the base container's 
`return.json` was complete — the dict was never fanned out into individual XCom 
keys.
   
   ### What you think should happen instead?
   
   `BaseOperator.multiple_outputs` is documented as:
   
   > if True and do_xcom_push is True, pushes multiple XComs, one for each key 
in the returned dictionary result.
   
   This contract should hold regardless of `deferrable`. The asymmetry is what 
makes this a bug rather than a missing feature: the failure is silent 
(downstream just gets `None`), and there's no way for the user to discover the 
flag works selectively based on `deferrable`.
   
   ### How to reproduce
   
   \`\`\`python
   import pendulum
   from airflow.sdk import dag
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   
   XCOM_PAYLOAD = (
       'mkdir -p /airflow/xcom && '
       'echo \'{\"export_arn\": \"arn:aws:dynamodb:::export/x\"}\' > 
/airflow/xcom/return.json'
   )
   
   
   @dag(start_date=pendulum.datetime(2026, 1, 1), schedule=None, catchup=False)
   def kpo_multiple_outputs_repro():
       common = dict(
           namespace=\"default\",
           image=\"busybox\",
           cmds=[\"sh\", \"-c\"],
           arguments=[XCOM_PAYLOAD],
           do_xcom_push=True,
           multiple_outputs=True,
       )
   
       KubernetesPodOperator(task_id=\"sync_pod\", deferrable=False, **common)
       KubernetesPodOperator(task_id=\"deferred_pod\", deferrable=True, 
**common)
   
   
   kpo_multiple_outputs_repro()
   \`\`\`
   
   Inspect XComs after the run:
   
   \`\`\`sql
   SELECT task_id, key FROM xcom WHERE dag_id = 'kpo_multiple_outputs_repro';
   \`\`\`
   
   Expected:
   \`\`\`
   sync_pod      | return_value
   sync_pod      | export_arn
   deferred_pod  | return_value
   deferred_pod  | export_arn   ← missing today
   \`\`\`
   
   Actual: only the two `return_value` rows and `sync_pod | export_arn`. 
`deferred_pod | export_arn` is absent.
   
   ### Root cause
   
   The two execution paths handle the sidecar output differently:
   
   **Sync path** (`pod.py:759-760` in 10.14.0):
   \`\`\`python
   if self.do_xcom_push:
       return result
   \`\`\`
   Returns the dict. The task runner's `_push_xcom_if_needed` 
(`task-sdk/.../execution_time/task_runner.py:1672`) then honors 
`multiple_outputs` and fans the dict out via the loop at `line 1700`. ✅
   
   **Deferred path** (`pod.py:993-996` in 10.14.0):
   \`\`\`python
   finally:
       self._clean(event=event, context=context, result=xcom_sidecar_output)
       if self.do_xcom_push and xcom_sidecar_output:
           context[\"ti\"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
   \`\`\`
   Pushes XCom manually and does not return the value. `_push_xcom_if_needed` 
never runs. `multiple_outputs` is silently ignored. ❌
   
   ### Suggested fix
   
   Make `trigger_reentry` return `xcom_sidecar_output` and let the task runner 
handle the push. Same code path the sync version already exercises:
   
   \`\`\`diff
    finally:
   -    self._clean(event=event, context=context, result=xcom_sidecar_output)
   -    if self.do_xcom_push and xcom_sidecar_output:
   -        context[\"ti\"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
   +    self._clean(event=event, context=context, result=xcom_sidecar_output)
   +
   +if self.do_xcom_push:
   +    return xcom_sidecar_output
   \`\`\`
   
   The `cleanup`-time push at `pod.py:1100` (failure path) has the same shape 
and likely the same bug, though only matters when a failed pod opted into both 
`do_xcom_push` and `multiple_outputs`.
   
   ### Workaround
   
   Keep `multiple_outputs=False` and use Jinja in the consumer's templated 
fields:
   `\"{{ ti.xcom_pull(task_ids='trigger_export')['export_arn'] }}\"`. Works in 
both modes but defeats the purpose of `multiple_outputs` and prevents `XComArg` 
subscripts in TaskFlow expressions.
   
   ### Anything else?
   
   Related but distinct:
   
   - #24487 — Dynamic mapping over KPO results triplicates child tasks
   - #37577 — `@task.kubernetes` + `LazyXComAccess` interaction
   - #37297 (closed) — added `multiple_outputs` to `BaseOperator` in 2.9.0
   
   None cover the sync-vs-deferred `multiple_outputs` asymmetry.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to