dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-898663582
So I did tested the workaround with a quick and dirty subclass and overriding of the execute method. For each env value I call mask_secret(): ```python class KubernetesPodOperator_(KubernetesPodOperator): def __init__( self, *args, **kwargs, ): super().__init__(*args, **kwargs) def _get_secret_var(self) -> List[k8s.V1EnvVar]: try: sec = Variable.get( "testvar", deserialize_json=True ) except KeyError: self.log.warning( "You have to add the variable in Airflow" ) sec = { "DBT_WAREHOUSE": "Default", } return [ k8s.V1EnvVar( name="DBT_DATABASE", value=sec["DBT_WAREHOUSE"], ), ] def execute(self, context): from airflow.utils.log.secrets_masker import mask_secret self.log.info( f"Custom KubernetesPodOperator runs K8S Task" ) self.env_vars = self._get_secret_var() for secret_env in self.env_vars: mask_secret(secret_env.value) return super().execute(context) ``` Afterwards I had a look in the logs and it seems that the masking now works but only in one of the two exception messages in the logs. Here is the detailed log message: ```python [2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 368, in execute raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') airflow.exceptions.AirflowException: Pod passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, 'cluster_name': None, 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal()), 'deletion_grace_period_seconds': None, 'deletion_timestamp': None, 'finalizers': None, 'generate_name': None, 'generation': None, 'initializers': None, 'labels': {'airflow_version': '2.1.2', 'dag_id': 'k8s_v3', 'execution_date': '2021-08-13T151421.6018540000-bfa4f3805', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'task_id': 'passing-task', 'try_number': '6'}, 'managed_fields': [{'api_version': 'v1', 'fields': None, 'manager': 'OpenAPI-Generator', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal())}, {'api_version': 'v1', 'fields': None, 'manager': 'kubelet', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 27, tzinfo=tzlocal())}], 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de', 'namespace': 'XXXXX', 'owner_references': None, 'resource_version': '90215538', 'self_link': None, 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'}, 'spec': {'active_deadline_seconds': None, 'affinity': {'node_affinity': None, 'pod_affinity': None, 'pod_anti_affinity': None}, 'automount_service_account_token': None, 'containers': [{'args': ['import sys; sys.exit(1)'], 'command': ['python', '-c'], 'env': [{'name': 'DBT_DATABASE', 'value': 'secret', 'value_from': None}], 'env_from': None, 'image': 'python:3.6', 'image_pull_policy': 'IfNotPresent', 'lifecycle': None, 'liveness_probe': None, 'name': 'base', 'ports': None, 'readiness_probe': None, 'resources': {'limits': {'cpu': '1500m', 'memory': '512Mi'}, 'requests': {'cpu': '512m', 'memory': '256Mi'}}, 'security_context': None, 'stdin': None, 'stdin_once': None, 'termination_message_path': '/dev/termination-log', 'termination_message_policy': 'File', 'tty': None, 'volume_devices': None, 'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount', 'mount_propagation': None, 'name': 'default-token-k59nb', 'read_only': True, 'sub_path': None, 'sub_path_expr': None}], 'working_dir': None}], ... During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task result = task_copy.execute(context=context) File "/opt/airflow/plugins/airflow_dbt/operators/dbt_k8s_operator.py", line 61, in execute return super().execute(context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 373, in execute raise AirflowException(f'Pod Launching failed: {ex}') airflow.exceptions.AirflowException: Pod Launching failed: Pod passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, 'cluster_name': None, 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal()), 'deletion_grace_period_seconds': None, 'deletion_timestamp': None, 'finalizers': None, 'generate_name': None, 'generation': None, 'initializers': None, 'labels': {'airflow_version': '2.1.2', 'dag_id': 'k8s_v3', 'execution_date': '2021-08-13T151421.6018540000-bfa4f3805', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'task_id': 'passing-task', 'try_number': '6'}, 'managed_fields': [{'api_version': 'v1', 'fields': None, 'manager': 'OpenAPI-Generator', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal())}, {'api_version': 'v1', 'fields': None, 'manager': 'kubelet', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 27, tzinfo=tzlocal())}], 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de', 'namespace': 'XXXXX', 'owner_references': None, 'resource_version': '90215538', 'self_link': None, 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'}, 'spec': {'active_deadline_seconds': None, 'affinity': {'node_affinity': None, 'pod_affinity': None, 'pod_anti_affinity': None}, 'automount_service_account_token': None, 'containers': [{'args': ['import sys; sys.exit(1)'], 'command': ['python', '-c'], 'env': [{'name': 'DBT_DATABASE', 'value': '***', 'value_from': None}], 'env_from': None, 'image': 'python:3.6', 'image_pull_policy': 'IfNotPresent', 'lifecycle': None, 'liveness_probe': None, 'name': 'base', 'ports': None, 'readiness_probe': None, 'resources': {'limits': {'cpu': '1500m', 'memory': '512Mi'}, 'requests': {'cpu': '512m', 'memory': '256Mi'}}, 'security_context': None, 'stdin': None, 'stdin_once': None, 'termination_message_path': '/dev/termination-log', 'termination_message_policy': 'File', 'tty': None, 'volume_devices': None, 'volume_mounts': [{'mount_path': '/var/run/***s/kubernetes.io/serviceaccount', 'mount_propagation': None, 'name': 'default-token-k59nb', 'read_only': True, 'sub_path': None, 'sub_path_expr': None}], 'working_dir': None}], ``` If a pod ends with an failure the KubernetesPodOperator seems to check the state first and if the state does not equal State.SUCCESS it raises an AirflowException. This exception is catched and raised again a couple of statements below. However the masking (at least how my very trivial subclass implements it) only works for the second exception message. [KubernetesPodOperator ](https://github.com/apache/airflow/blob/providers-cncf-kubernetes/2.0.0/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py) ```python else: self.log.info("creating pod with labels %s and launcher %s", labels, launcher) final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) if final_state != State.SUCCESS: raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name) context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace) return result except AirflowException as ex: raise AirflowException(f'Pod Launching failed: {ex}') ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org