jedcunningham commented on code in PR #62129:
URL: https://github.com/apache/airflow/pull/62129#discussion_r2849269672
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -553,12 +557,28 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
+
+ pod_id = create_unique_id(dag_id, task_id)
+ secret_name = ""
+
if len(command) == 1:
from airflow.executors.workloads import ExecuteTask
if isinstance(command[0], ExecuteTask):
workload = command[0]
- command = workload_to_command_args(workload)
+ secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}"
+ self.kube_client.create_namespaced_secret(
Review Comment:
+1
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py:
##########
@@ -199,3 +199,16 @@ def _delete_pod(name, namespace):
print(f'Deleting POD "{name}" from "{namespace}" namespace')
api_response = kube_client.delete_namespaced_pod(name=name,
namespace=namespace, body=delete_options)
print(api_response)
+ _delete_workload_secret(name, namespace)
Review Comment:
We also need to handle orphaned secrets. Both here in the cleanup command
(we'd only clean them up if we have a pod) and if the pod failed to get created
or the patch for ownerRef failed.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py:
##########
@@ -199,3 +199,16 @@ def _delete_pod(name, namespace):
print(f'Deleting POD "{name}" from "{namespace}" namespace')
api_response = kube_client.delete_namespaced_pod(name=name,
namespace=namespace, body=delete_options)
print(api_response)
+ _delete_workload_secret(name, namespace)
+
+
+def _delete_workload_secret(pod_name, namespace):
+ """Delete the workload secret associated with a pod if it exists."""
+ kube_client = get_kube_client()
+ secret_name = f"airflow-workload-{pod_name}"
+ try:
+ kube_client.delete_namespaced_secret(name=secret_name,
namespace=namespace)
+ print(f'Deleted workload secret "{secret_name}" from "{namespace}"
namespace')
+ except ApiException as e:
+ if str(e.status) != "404":
Review Comment:
```suggestion
if e.status != 404:
```
Already an int, no?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -553,12 +558,38 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
+
+ pod_id = create_unique_id(dag_id, task_id)
+ secret_name = ""
+
if len(command) == 1:
from airflow.executors.workloads import ExecuteTask
if isinstance(command[0], ExecuteTask):
workload = command[0]
- command = workload_to_command_args(workload)
+ secret_name = f"{WORKLOAD_SECRET_NAME}-{pod_id}"
Review Comment:
Any concerns on this secret name now being too long since we have the prefix?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py:
##########
@@ -54,28 +54,25 @@
if TYPE_CHECKING:
import datetime
- from airflow.executors import workloads
from airflow.models.taskinstance import TaskInstance
log = logging.getLogger(__name__)
MAX_LABEL_LEN = 63
+WORKLOAD_SECRET_VOLUME_NAME = "airflow-workload"
+WORKLOAD_SECRET_NAME = "airflow-workload"
Review Comment:
```suggestion
WORKLOAD_SECRET_NAME_PREFIX = "airflow-workload"
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]