dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898663582


   So I did tested the workaround with a quick and dirty subclass and 
overriding of the execute method. For each env value I call mask_secret(): 
   
   ```python
   class KubernetesPodOperator_(KubernetesPodOperator):
   
       def __init__(
           self,
           *args,
           **kwargs,
       ):
   
           super().__init__(*args, **kwargs)
   
       def _get_secret_var(self) -> List[k8s.V1EnvVar]:
           try:
               sec = Variable.get(
                   "testvar", deserialize_json=True
               )
           except KeyError:
               self.log.warning(
                   "You have to add the variable in Airflow"
               )
               sec = {
                   "DBT_WAREHOUSE": "Default",
               }
           return [
               k8s.V1EnvVar(
                   name="DBT_DATABASE",
                   value=sec["DBT_WAREHOUSE"],
               ),
           ]
   
       def execute(self, context):
           from airflow.utils.log.secrets_masker import mask_secret
           self.log.info(
               f"Custom KubernetesPodOperator runs K8S Task"
           )
           self.env_vars = self._get_secret_var()
           for secret_env in self.env_vars:
               mask_secret(secret_env.value)
           return super().execute(context)
   ```
   
   Afterwards I had a look in the logs and it seems that the masking now works 
but only in one of the two exception messages in the logs. 
   
   Here is the detailed log message:
   ```python
   [2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 368, in execute
       raise AirflowException(f'Pod {self.pod.metadata.name} returned a 
failure: {remote_pod}')
   airflow.exceptions.AirflowException: Pod 
passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: 
{'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'cluster_name': None,
                 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 
33, tzinfo=tzlocal()),
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'initializers': None,
                 'labels': {'airflow_version': '2.1.2',
                            'dag_id': 'k8s_v3',
                            'execution_date': 
'2021-08-13T151421.6018540000-bfa4f3805',
                            'foo': 'bar',
                            'kubernetes_pod_operator': 'True',
                            'task_id': 'passing-task',
                            'try_number': '6'},
                 'managed_fields': [{'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'OpenAPI-Generator',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 18, 
26, 33, tzinfo=tzlocal())},
                                    {'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'kubelet',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 18, 
27, tzinfo=tzlocal())}],
                 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
                 'namespace': 'XXXXX',
                 'owner_references': None,
                 'resource_version': '90215538',
                 'self_link': None,
                 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'node_affinity': None,
                          'pod_affinity': None,
                          'pod_anti_affinity': None},
             'automount_service_account_token': None,
             'containers': [{'args': ['import sys; sys.exit(1)'],
                             'command': ['python', '-c'],
                             'env': [{'name': 'DBT_DATABASE',
                                      'value': 'secret',
                                      'value_from': None}],
                             'env_from': None,
                             'image': 'python:3.6',
                             'image_pull_policy': 'IfNotPresent',
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': None,
                             'readiness_probe': None,
                             'resources': {'limits': {'cpu': '1500m',
                                                      'memory': '512Mi'},
                                           'requests': {'cpu': '512m',
                                                        'memory': '256Mi'}},
                             'security_context': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': '/dev/termination-log',
                             'termination_message_policy': 'File',
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': [{'mount_path': 
'/var/run/secrets/kubernetes.io/serviceaccount',
                                                'mount_propagation': None,
                                                'name': 'default-token-k59nb',
                                                'read_only': True,
                                                'sub_path': None,
                                                'sub_path_expr': None}],
                             'working_dir': None}],
   ...
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1157, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1331, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1361, in _execute_task
       result = task_copy.execute(context=context)
     File "/opt/airflow/plugins/airflow_dbt/operators/dbt_k8s_operator.py", 
line 61, in execute
       return super().execute(context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 373, in execute
       raise AirflowException(f'Pod Launching failed: {ex}')
   airflow.exceptions.AirflowException: Pod Launching failed: Pod 
passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: 
{'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'cluster_name': None,
                 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 
33, tzinfo=tzlocal()),
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'initializers': None,
                 'labels': {'airflow_version': '2.1.2',
                            'dag_id': 'k8s_v3',
                            'execution_date': 
'2021-08-13T151421.6018540000-bfa4f3805',
                            'foo': 'bar',
                            'kubernetes_pod_operator': 'True',
                            'task_id': 'passing-task',
                            'try_number': '6'},
                 'managed_fields': [{'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'OpenAPI-Generator',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 18, 
26, 33, tzinfo=tzlocal())},
                                    {'api_version': 'v1',
                                     'fields': None,
                                     'manager': 'kubelet',
                                     'operation': 'Update',
                                     'time': datetime.datetime(2021, 8, 13, 18, 
27, tzinfo=tzlocal())}],
                 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
                 'namespace': 'XXXXX',
                 'owner_references': None,
                 'resource_version': '90215538',
                 'self_link': None,
                 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'node_affinity': None,
                          'pod_affinity': None,
                          'pod_anti_affinity': None},
             'automount_service_account_token': None,
             'containers': [{'args': ['import sys; sys.exit(1)'],
                             'command': ['python', '-c'],
                             'env': [{'name': 'DBT_DATABASE',
                                      'value': '***',
                                      'value_from': None}],
                             'env_from': None,
                             'image': 'python:3.6',
                             'image_pull_policy': 'IfNotPresent',
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': None,
                             'readiness_probe': None,
                             'resources': {'limits': {'cpu': '1500m',
                                                      'memory': '512Mi'},
                                           'requests': {'cpu': '512m',
                                                        'memory': '256Mi'}},
                             'security_context': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': '/dev/termination-log',
                             'termination_message_policy': 'File',
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': [{'mount_path': 
'/var/run/***s/kubernetes.io/serviceaccount',
                                                'mount_propagation': None,
                                                'name': 'default-token-k59nb',
                                                'read_only': True,
                                                'sub_path': None,
                                                'sub_path_expr': None}],
                             'working_dir': None}],
   ```
   
   If a pod ends with an failure the KubernetesPodOperator seems to check the 
state first and if the state does not equal State.SUCCESS it raises an 
AirflowException. This exception is catched and raised again a couple of 
statements below. However the masking (at least how my very trivial subclass 
implements it) only works for the second exception message.
   
   [KubernetesPodOperator 
](https://github.com/apache/airflow/blob/providers-cncf-kubernetes/2.0.0/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py)
   
   ```python
               else:
                   self.log.info("creating pod with labels %s and launcher %s", 
labels, launcher)
                   final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
               if final_state != State.SUCCESS:
                   raise AirflowException(f'Pod {self.pod.metadata.name} 
returned a failure: {remote_pod}')
               context['task_instance'].xcom_push(key='pod_name', 
value=self.pod.metadata.name)
               context['task_instance'].xcom_push(key='pod_namespace', 
value=self.namespace)
               return result
           except AirflowException as ex:
               raise AirflowException(f'Pod Launching failed: {ex}')
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to