Copilot commented on code in PR #64187:
URL: https://github.com/apache/airflow/pull/64187#discussion_r3025328735
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/eks.py:
##########
@@ -89,6 +93,132 @@ async def run(self):
yield TriggerEvent({"status": "success"})
+class EksPodTrigger(KubernetesPodTrigger):
+ """
+ KubernetesPodTrigger for EKS that generates fresh kubeconfig with new
credentials.
+
+ When ``EksPodOperator`` defers, the kubeconfig stored in ``config_dict``
contains
+ an exec command that references a temporary credentials file. That file is
cleaned
+ up when the operator's context managers exit (on deferral). By the time
the trigger
+ runs — whether in a real triggerer process or inline via ``dag.test()`` —
the file
+ is gone.
+
+ This trigger solves the problem by regenerating the kubeconfig with fresh
AWS
+ credentials before executing. The temporary files are kept alive for the
entire
+ duration of the trigger's ``run()`` method.
+
+ :param eks_cluster_name: The name of the Amazon EKS Cluster.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param region: Which AWS region the connection should use.
+ """
+
+ def __init__(
+ self,
+ *,
+ eks_cluster_name: str,
+ aws_conn_id: str | None = None,
+ region: str | None = None,
+ pod_name: str,
+ pod_namespace: str,
+ trigger_start_time: datetime.datetime,
+ base_container_name: str,
+ kubernetes_conn_id: str | None = None,
+ connection_extras: dict | None = None,
+ poll_interval: float = 2,
+ cluster_context: str | None = None,
+ config_dict: dict | None = None,
+ in_cluster: bool | None = None,
+ get_logs: bool = True,
+ startup_timeout: int = 120,
+ startup_check_interval: float = 5,
+ schedule_timeout: int = 120,
+ on_finish_action: str = "delete_pod",
+ on_kill_action: str = "delete_pod",
+ termination_grace_period: int | None = None,
+ last_log_time: DateTime | None = None,
+ logging_interval: int | None = None,
+ trigger_kwargs: dict | None = None,
+ ):
+ super().__init__(
+ pod_name=pod_name,
+ pod_namespace=pod_namespace,
+ trigger_start_time=trigger_start_time,
+ base_container_name=base_container_name,
+ kubernetes_conn_id=kubernetes_conn_id,
+ connection_extras=connection_extras,
+ poll_interval=poll_interval,
+ cluster_context=cluster_context,
+ config_dict=config_dict,
+ in_cluster=in_cluster,
+ get_logs=get_logs,
+ startup_timeout=startup_timeout,
+ startup_check_interval=startup_check_interval,
+ schedule_timeout=schedule_timeout,
+ on_finish_action=on_finish_action,
+ on_kill_action=on_kill_action,
+ termination_grace_period=termination_grace_period,
+ last_log_time=last_log_time,
+ logging_interval=logging_interval,
+ trigger_kwargs=trigger_kwargs,
+ )
+ self.eks_cluster_name = eks_cluster_name
+ self._aws_conn_id = aws_conn_id
+ self.region = region
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serialize EksPodTrigger arguments and classpath."""
+ _, kwargs = super().serialize()
+ kwargs["eks_cluster_name"] = self.eks_cluster_name
+ kwargs["aws_conn_id"] = self._aws_conn_id
+ kwargs["region"] = self.region
+ return (
+ "airflow.providers.amazon.aws.triggers.eks.EksPodTrigger",
+ kwargs,
+ )
+
+ async def run(self):
+ """Generate fresh kubeconfig, then delegate to the parent trigger."""
+ from airflow.utils import yaml
+
+ eks_hook = EksHook(
+ aws_conn_id=self._aws_conn_id,
+ region_name=self.region,
+ )
+ session = eks_hook.get_session()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise RuntimeError(
Review Comment:
Raising a bare `RuntimeError` from a trigger makes failures less consistent
with other Airflow components and can reduce the quality/structure of failure
reporting. Prefer raising `AirflowException` (for consistent error handling),
or (if this trigger is expected to report failures via events) catching the
failure and yielding a `TriggerEvent` that clearly communicates the error
status/message to the deferred task.
```suggestion
raise AirflowException(
```
##########
providers/amazon/tests/unit/amazon/aws/triggers/test_eks.py:
##########
@@ -318,3 +320,134 @@ async def
test_when_there_are_no_fargate_profiles_it_should_only_log_message(sel
self.trigger.log.info.assert_called_once_with(
"No Fargate profiles associated with cluster %s", CLUSTER_NAME
)
+
+
+class TestEksPodTrigger:
+ """Tests for EksPodTrigger."""
+
+ TRIGGER_START_TIME = datetime.datetime(2026, 1, 1,
tzinfo=datetime.timezone.utc)
+
+ def _create_trigger(self, **overrides):
+ """Create an EksPodTrigger with sensible defaults."""
+ defaults = {
+ "eks_cluster_name": CLUSTER_NAME,
+ "aws_conn_id": AWS_CONN_ID,
+ "region": REGION_NAME,
+ "pod_name": "test-pod",
+ "pod_namespace": "default",
+ "trigger_start_time": self.TRIGGER_START_TIME,
+ "base_container_name": "base",
+ "config_dict": {"old": "stale-config"},
+ }
+ defaults.update(overrides)
+ return EksPodTrigger(**defaults)
+
+ def test_serialize_includes_eks_fields(self):
+ """serialize() should include eks_cluster_name, aws_conn_id, and
region."""
+ trigger = self._create_trigger()
+ classpath, kwargs = trigger.serialize()
+
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.eks.EksPodTrigger"
+ assert kwargs["eks_cluster_name"] == CLUSTER_NAME
+ assert kwargs["aws_conn_id"] == AWS_CONN_ID
+ assert kwargs["region"] == REGION_NAME
+ # Also verify parent fields are present
+ assert kwargs["pod_name"] == "test-pod"
+ assert kwargs["pod_namespace"] == "default"
+
+ def test_serialize_roundtrip(self):
+ """A trigger created from serialized kwargs should serialize
identically."""
+ trigger = self._create_trigger()
+ classpath, kwargs = trigger.serialize()
+
+ trigger2 = EksPodTrigger(**kwargs)
+ classpath2, kwargs2 = trigger2.serialize()
+
+ assert classpath == classpath2
+ assert kwargs == kwargs2
+
+ @pytest.mark.asyncio
+
@patch("airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger.run")
+
@patch("airflow.providers.amazon.aws.hooks.eks.EksHook.generate_config_file")
+
@patch("airflow.providers.amazon.aws.hooks.eks.EksHook._secure_credential_context")
+ @patch("airflow.providers.amazon.aws.hooks.eks.EksHook.get_session")
+ @patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__",
return_value=None)
+ async def test_run_generates_fresh_kubeconfig(
+ self,
+ mock_eks_hook_init,
+ mock_get_session,
+ mock_secure_credential_context,
+ mock_generate_config_file,
+ mock_parent_run,
+ ):
+ """run() should get fresh credentials, generate kubeconfig, and
delegate to parent."""
+ # Set up credential mocks
+ mock_session = MagicMock()
+ mock_credentials = MagicMock()
+ mock_frozen = MagicMock()
+ mock_frozen.access_key = "AKIATEST"
+ mock_frozen.secret_key = "secret123"
+ mock_frozen.token = "token456"
+ mock_get_session.return_value = mock_session
+ mock_session.get_credentials.return_value = mock_credentials
+ mock_credentials.get_frozen_credentials.return_value = mock_frozen
+
+ # Set up context manager mocks
+ mock_secure_credential_context.return_value.__enter__.return_value =
"/tmp/test.aws_creds"
+ mock_generate_config_file.return_value.__enter__.return_value =
"/tmp/test_kubeconfig"
+
+ # Mock reading the kubeconfig file
+ with patch("pathlib.Path.read_text", return_value="apiVersion:
v1\nkind: Config\nclusters: []"):
+ # Mock the parent run to yield one event
+ mock_parent_run.return_value.__aiter__ = AsyncMock(
+ return_value=AsyncMock(
+ __anext__=AsyncMock(side_effect=[TriggerEvent({"status":
"success"}), StopAsyncIteration])
+ )
+ )
+
+ # Use a simpler approach: just make parent run an async generator
+ async def mock_gen():
+ yield TriggerEvent({"status": "success"})
+
+ mock_parent_run.return_value = mock_gen()
Review Comment:
The test sets up two different async-iteration strategies for the same mock
(`__aiter__`/`__anext__` and then an async generator), but the first setup is
immediately overridden and never used. Removing the dead setup will make the
test easier to follow and reduce the chance of future edits accidentally
relying on the wrong mocking approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]