[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-09-16 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-920801580


   Anything I can do here to provide more context?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-08-15 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-899256816


   Thank you for the quick respond.
   
   I think I do not completely understand what you mean. Is it correct that you 
assume that there is another either Airflow variable or connection with the 
name "dbt_warehouse" (lowercase) as key?
   
   If so, I tested it on a fresh local Airflow dev instance with a clean 
environment withouth any other variable than the above mentionend in the 
context section.
   
   But let me test it with another key name that I have not used so far and let 
me list all Airflow variables and connection before.
   
   I guess I am asking this because if the masking works as expected and with 
your latested changes on the exception this feature request, at least from my 
site, is not needed anymore. 
   
   So I run another test, I created a fresh local Airflow instance and listed 
all variables and connections:
   ```
   airflow variables list -o table
   key
   ===
   testvar
   ```
   the variable structure is:
   ```
   airflow variables get testvar
   {
   "THIS_IS_A_RANDOM_SENSITIVE_VAR": "SuperRandomString1"
   }
   ```
   
   and 
   ```
   airflow connections list -o table
   No data found
   ```
   
   I sticked to the same example above with:
   `
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "THIS_IS_A_RANDOM_SENSITIVE_VAR"
   `
   ```Python
   [2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:67}} INFO - 
{'THIS_IS_A_RANDOM_SENSITIVE_VAR': 'SuperRandomString1'}
   [2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:69}} INFO - Custom 
KubernetesPodOperator runs K8S Task
   [2021-08-16 06:09:20,823] {{taskinstance.py:1501}} ERROR - Task failed with 
exception
   
   ```
   
   Then I only changed the Airflow configuration variable 
AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES and changed the var name to all 
lowercase
   
   `
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "this_is_a_random_sensitive_var"
   `
   
   ```Python
   [2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:67}} INFO - 
{'THIS_IS_A_RANDOM_SENSITIVE_VAR': '***'}
   [2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:69}} INFO - Custom 
KubernetesPodOperator runs K8S Task
   [2021-08-16 06:14:19,906] {{taskinstance.py:1501}} ERROR - Task failed with 
exception
   
   ```
   
   I get the same behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-08-15 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-899081566


   Great! Glad that I could help finding this issue. 
   
   So I had a look on your PR and also on the original PR that merged the 
'masking feature'. My example above should actually also work without having to 
change anything in our customer operator - I mean beside your changes on the 
secret masking in the exceptions.
   
   Reading through the docs whenever you either use a connection or variable 
and if the variable key names appear in the configuration varibles 
`AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES` it should be handled by default.
   
   In our setup and in the example above I get the env variables from an 
Airflow variable thus I am wondering why that does not work - beside of course 
the expection issue that your took care of.
   
   So I tested a bit and I found another strange behavior that I could not 
explain. It seems that variable names in the configuration that have only 
capital letters somehow are not handled correctly and therefore their values 
are not masked.
   
   First test (masking does not work):
   ```Python
   # Context
   
   # Airflow variable name: testvar
   # Airflow variable payload:
   # {
   # "DBT_WAREHOUSE": "secret"
   # }
   
   # Configuration
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_WAREHOUSE"
   
   # Custom operator
   class KubernetesPodOperator_(KubernetesPodOperator):
   
   def __init__(
   self,
   *args,
   **kwargs,
   ):
   
   super().__init__(*args, **kwargs)
   
   def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]:
   try:
   sec = Variable.get(
   "testvar", deserialize_json=True
   )
   except KeyError:
   self.log.warning(
   "You have to add the variable in Airflow"
   )
   sec = None
   return sec
   
   def execute(self, context):
   dummy_var = self._get_dummy_secret_var()
   self.log.info(dummy_var)
   
   self.log.info(
   f"Custom KubernetesPodOperator runs K8S Task"
   )
   return super().execute(context)
   ...
   # Output
   [2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:67}} INFO - 
{'DBT_WAREHOUSE': 'secret'}
   [2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:69}} INFO - Custom 
KubernetesPodOperator runs K8S Task
   
   
   ```
   
   Second test (masking is working):
   ```Python
   # Context
   
   # Airflow variable name: testvar
   # Airflow variable payload:
   # {
   # "DBT_WAREHOUSE": "secret"
   # }
   
   # Configuration
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "dbt_warehouse"
   
   # Custom operator
   class KubernetesPodOperator_(KubernetesPodOperator):
   
   def __init__(
   self,
   *args,
   **kwargs,
   ):
   
   super().__init__(*args, **kwargs)
   
   def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]:
   try:
   sec = Variable.get(
   "testvar", deserialize_json=True
   )
   except KeyError:
   self.log.warning(
   "You have to add the variable in Airflow"
   )
   sec = None
   return sec
   
   def execute(self, context):
   dummy_var = self._get_dummy_secret_var()
   self.log.info(dummy_var)
   
   self.log.info(
   f"Custom KubernetesPodOperator runs K8S Task"
   )
   return super().execute(context)
   ...
   # Output
   [2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:67}} INFO - 
{'DBT_WAREHOUSE': '***'}
   [2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:69}} INFO - Custom 
KubernetesPodOperator runs K8S Task
   
   
   ```
   
   Thus the masing only works if you change the variable name in the Airflow 
config from 'DBT_WAREHOUSE' to 'dbt_warehouse'. As our variables key names all 
written in capital letters our previous masking attemps did not work. 
   
   Any clue why that happens?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-08-13 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898663582


   So I did tested the workaround with a quick and dirty subclass and 
overriding of the execute method. For each env value I call mask_secret(): 
   
   ```python
   class KubernetesPodOperator_(KubernetesPodOperator):
   
   def __init__(
   self,
   *args,
   **kwargs,
   ):
   
   super().__init__(*args, **kwargs)
   
   def _get_secret_var(self) -> List[k8s.V1EnvVar]:
   try:
   sec = Variable.get(
   "testvar", deserialize_json=True
   )
   except KeyError:
   self.log.warning(
   "You have to add the variable in Airflow"
   )
   sec = {
   "DBT_WAREHOUSE": "Default",
   }
   return [
   k8s.V1EnvVar(
   name="DBT_DATABASE",
   value=sec["DBT_WAREHOUSE"],
   ),
   ]
   
   def execute(self, context):
   from airflow.utils.log.secrets_masker import mask_secret
   self.log.info(
   f"Custom KubernetesPodOperator runs K8S Task"
   )
   self.env_vars = self._get_secret_var()
   for secret_env in self.env_vars:
   mask_secret(secret_env.value)
   return super().execute(context)
   ```
   
   Afterwards I had a look in the logs and it seems that the masking now works 
but only in one of the two exception messages in the logs. 
   
   Here is the detailed log message:
   ```python
   [2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 368, in execute
   raise AirflowException(f'Pod {self.pod.metadata.name} returned a 
failure: {remote_pod}')
   airflow.exceptions.AirflowException: Pod 
passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: 
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
 'cluster_name': None,
 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 
33, tzinfo=tzlocal()),
 'deletion_grace_period_seconds': None,
 'deletion_timestamp': None,
 'finalizers': None,
 'generate_name': None,
 'generation': None,
 'initializers': None,
 'labels': {'airflow_version': '2.1.2',
'dag_id': 'k8s_v3',
'execution_date': 
'2021-08-13T151421.601854-bfa4f3805',
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'task_id': 'passing-task',
'try_number': '6'},
 'managed_fields': [{'api_version': 'v1',
 'fields': None,
 'manager': 'OpenAPI-Generator',
 'operation': 'Update',
 'time': datetime.datetime(2021, 8, 13, 18, 
26, 33, tzinfo=tzlocal())},
{'api_version': 'v1',
 'fields': None,
 'manager': 'kubelet',
 'operation': 'Update',
 'time': datetime.datetime(2021, 8, 13, 18, 
27, tzinfo=tzlocal())}],
 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
 'namespace': 'X',
 'owner_references': None,
 'resource_version': '90215538',
 'self_link': None,
 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
'spec': {'active_deadline_seconds': None,
 'affinity': {'node_affinity': None,
  'pod_affinity': None,
  'pod_anti_affinity': None},
 'automount_service_account_token': None,
 'containers': [{'args': ['import sys; sys.exit(1)'],
 'command': ['python', '-c'],
 'env': [{'name': 'DBT_DATABASE',
  'value': 'secret',
  'value_from': None}],
 'env_from': None,
 'image': 'python:3.6',
 'image_pull_policy': 'IfNotPresent',
 'lifecycle': None,
 'liveness_probe': None,
 'name': 'base',
 'ports': None,
 'readiness_probe': None,
 'resources': {'l

[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-08-13 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898612407


   Thank you both for your respond and showing me a quick workaroung. 
   
   I'll implement it and test it against our custom solution. We do something 
similar by override the execute() method and apply a kind of custom masking on 
the remote_pod object. 
   
   Yes we could use kubernetes secrets for our use cases as well. I assume 
there are pros and cons for using secrets. To my knowledge it was not possible 
to create secrets 'on the fly' with an instance of the KubernetesPodOperator 
for just running the specific task and afterwards remove it. Even if you could, 
you still would generate a secret per task instance or have a logic that check 
if a secret already exist and so on. Yes we could simply create this secret 
once and then use it but it would be inflexible if we want to change some 
values here and there as we then always need to change the secret.
   
   At the moment (see example above) we use a V1EnvVar model instance and pass 
it to the KubernetesPodOperator. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events

2021-08-13 Thread GitBox


dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898580071


   We run version v2.1.2.
   
   So I had a look again in the 'Secret Masker" feature as I had it already 
configured without any positive effect on the environment variable masking. 
   
   But most likely our setup and configuration might be wrong. So let's give 
some context:
   
   We have a custom logging config that uses Azure blob strorage for remote 
logging where we have add the new filter on the remote task handler:
   ```python
   DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
   "version": 1,
   "disable_existing_loggers": False,
   "formatters": {
   "airflow": {"format": LOG_FORMAT},
   "airflow_coloured": {
   "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
   "class": COLORED_FORMATTER_CLASS
   if COLORED_LOG
   else "logging.Formatter",
   },
   },
   "filters": {
   "mask_secrets": {
   "()": "airflow.utils.log.secrets_masker.SecretsMasker",
   },
   },
   "handlers": {
   "console": {
   "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
   "formatter": "airflow_coloured",
   "stream": "sys.stdout",
   "filters": ["mask_secrets"],
   },
   "task": {
   "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
   "formatter": "airflow",
   "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
   "filename_template": FILENAME_TEMPLATE,
   "filters": ["mask_secrets"],
   },
   "processor": {
   "class": 
"airflow.utils.log.file_processor_handler.FileProcessorHandler",  # noqa: E501
   "formatter": "airflow",
   "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
   "filename_template": PROCESSOR_FILENAME_TEMPLATE,
   "filters": ["mask_secrets"],
   },
   },
   "loggers": {
   "airflow.processor": {
   "handlers": ["processor"],
   "level": LOG_LEVEL,
   "propagate": False,
   },
   "airflow.task": {
   "handlers": ["task"],
   "level": LOG_LEVEL,
   "propagate": False,
   "filters": ["mask_secrets"],
   },
   "flask_appbuilder": {
   "handler": ["console"],
   "level": FAB_LOG_LEVEL,
   "propagate": True,
   },
   },
   "root": {
   "handlers": ["console"],
   "level": LOG_LEVEL,
   "filters": ["mask_secrets"],
   },
   }
   
   EXTRA_LOGGER_NAMES: str = conf.get(
   "logging", "EXTRA_LOGGER_NAMES", fallback=None
   )
   if EXTRA_LOGGER_NAMES:
   new_loggers = {
   logger_name.strip(): {
   "handler": ["console"],
   "level": LOG_LEVEL,
   "propagate": True,
   }
   for logger_name in EXTRA_LOGGER_NAMES.split(",")
   }
   DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
   
   DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
   "handlers": {
   "processor_manager": {
   "class": "logging.handlers.RotatingFileHandler",
   "formatter": "airflow",
   "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
   "mode": "a",
   "maxBytes": 104857600,  # 100MB
   "backupCount": 5,
   }
   },
   "loggers": {
   "airflow.processor_manager": {
   "handlers": ["processor_manager"],
   "level": LOG_LEVEL,
   "propagate": False,
   }
   },
   }
   
   ...
   
   elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
   REMOTE_BASE_LOG_FOLDER = TARGET_HOSTNAME
   WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
   "task": {
   "class": 
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",  # 
noqa: E501
   "formatter": "airflow",
   "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
   "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
   "wasb_container": "airflow-logs",
   "filename_template": FILENAME_TEMPLATE,
   "delete_local_copy": False,
   "filters": ["mask_secrets"],
   },
   }
   ```
   
   In our Airflow configuration we have (we are using helm charts):
   
   `
   AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_PWD,DBT_WAREHOUSE"
   `
   
   I just had a look in the Airflow doc to verify not to miss any other 
important config. I found AIRFLOW__CORE__HIDE_SENSITIVE_VAR_CONN_FIELDS  but it 
is true by default and we do not change it.