[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-920801580 Anything I can do here to provide more context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-899256816 Thank you for the quick respond. I think I do not completely understand what you mean. Is it correct that you assume that there is another either Airflow variable or connection with the name "dbt_warehouse" (lowercase) as key? If so, I tested it on a fresh local Airflow dev instance with a clean environment withouth any other variable than the above mentionend in the context section. But let me test it with another key name that I have not used so far and let me list all Airflow variables and connection before. I guess I am asking this because if the masking works as expected and with your latested changes on the exception this feature request, at least from my site, is not needed anymore. So I run another test, I created a fresh local Airflow instance and listed all variables and connections: ``` airflow variables list -o table key === testvar ``` the variable structure is: ``` airflow variables get testvar { "THIS_IS_A_RANDOM_SENSITIVE_VAR": "SuperRandomString1" } ``` and ``` airflow connections list -o table No data found ``` I sticked to the same example above with: ` AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "THIS_IS_A_RANDOM_SENSITIVE_VAR" ` ```Python [2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:67}} INFO - {'THIS_IS_A_RANDOM_SENSITIVE_VAR': 'SuperRandomString1'} [2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task [2021-08-16 06:09:20,823] {{taskinstance.py:1501}} ERROR - Task failed with exception ``` Then I only changed the Airflow configuration variable AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES and changed the var name to all lowercase ` AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "this_is_a_random_sensitive_var" ` ```Python [2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:67}} INFO - {'THIS_IS_A_RANDOM_SENSITIVE_VAR': '***'} [2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task [2021-08-16 06:14:19,906] {{taskinstance.py:1501}} ERROR - Task failed with exception ``` I get the same behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-899081566 Great! Glad that I could help finding this issue. So I had a look on your PR and also on the original PR that merged the 'masking feature'. My example above should actually also work without having to change anything in our customer operator - I mean beside your changes on the secret masking in the exceptions. Reading through the docs whenever you either use a connection or variable and if the variable key names appear in the configuration varibles `AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES` it should be handled by default. In our setup and in the example above I get the env variables from an Airflow variable thus I am wondering why that does not work - beside of course the expection issue that your took care of. So I tested a bit and I found another strange behavior that I could not explain. It seems that variable names in the configuration that have only capital letters somehow are not handled correctly and therefore their values are not masked. First test (masking does not work): ```Python # Context # Airflow variable name: testvar # Airflow variable payload: # { # "DBT_WAREHOUSE": "secret" # } # Configuration AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_WAREHOUSE" # Custom operator class KubernetesPodOperator_(KubernetesPodOperator): def __init__( self, *args, **kwargs, ): super().__init__(*args, **kwargs) def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]: try: sec = Variable.get( "testvar", deserialize_json=True ) except KeyError: self.log.warning( "You have to add the variable in Airflow" ) sec = None return sec def execute(self, context): dummy_var = self._get_dummy_secret_var() self.log.info(dummy_var) self.log.info( f"Custom KubernetesPodOperator runs K8S Task" ) return super().execute(context) ... # Output [2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:67}} INFO - {'DBT_WAREHOUSE': 'secret'} [2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task ``` Second test (masking is working): ```Python # Context # Airflow variable name: testvar # Airflow variable payload: # { # "DBT_WAREHOUSE": "secret" # } # Configuration AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "dbt_warehouse" # Custom operator class KubernetesPodOperator_(KubernetesPodOperator): def __init__( self, *args, **kwargs, ): super().__init__(*args, **kwargs) def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]: try: sec = Variable.get( "testvar", deserialize_json=True ) except KeyError: self.log.warning( "You have to add the variable in Airflow" ) sec = None return sec def execute(self, context): dummy_var = self._get_dummy_secret_var() self.log.info(dummy_var) self.log.info( f"Custom KubernetesPodOperator runs K8S Task" ) return super().execute(context) ... # Output [2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:67}} INFO - {'DBT_WAREHOUSE': '***'} [2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task ``` Thus the masing only works if you change the variable name in the Airflow config from 'DBT_WAREHOUSE' to 'dbt_warehouse'. As our variables key names all written in capital letters our previous masking attemps did not work. Any clue why that happens? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-898663582 So I did tested the workaround with a quick and dirty subclass and overriding of the execute method. For each env value I call mask_secret(): ```python class KubernetesPodOperator_(KubernetesPodOperator): def __init__( self, *args, **kwargs, ): super().__init__(*args, **kwargs) def _get_secret_var(self) -> List[k8s.V1EnvVar]: try: sec = Variable.get( "testvar", deserialize_json=True ) except KeyError: self.log.warning( "You have to add the variable in Airflow" ) sec = { "DBT_WAREHOUSE": "Default", } return [ k8s.V1EnvVar( name="DBT_DATABASE", value=sec["DBT_WAREHOUSE"], ), ] def execute(self, context): from airflow.utils.log.secrets_masker import mask_secret self.log.info( f"Custom KubernetesPodOperator runs K8S Task" ) self.env_vars = self._get_secret_var() for secret_env in self.env_vars: mask_secret(secret_env.value) return super().execute(context) ``` Afterwards I had a look in the logs and it seems that the masking now works but only in one of the two exception messages in the logs. Here is the detailed log message: ```python [2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 368, in execute raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') airflow.exceptions.AirflowException: Pod passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, 'cluster_name': None, 'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal()), 'deletion_grace_period_seconds': None, 'deletion_timestamp': None, 'finalizers': None, 'generate_name': None, 'generation': None, 'initializers': None, 'labels': {'airflow_version': '2.1.2', 'dag_id': 'k8s_v3', 'execution_date': '2021-08-13T151421.601854-bfa4f3805', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'task_id': 'passing-task', 'try_number': '6'}, 'managed_fields': [{'api_version': 'v1', 'fields': None, 'manager': 'OpenAPI-Generator', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal())}, {'api_version': 'v1', 'fields': None, 'manager': 'kubelet', 'operation': 'Update', 'time': datetime.datetime(2021, 8, 13, 18, 27, tzinfo=tzlocal())}], 'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de', 'namespace': 'X', 'owner_references': None, 'resource_version': '90215538', 'self_link': None, 'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'}, 'spec': {'active_deadline_seconds': None, 'affinity': {'node_affinity': None, 'pod_affinity': None, 'pod_anti_affinity': None}, 'automount_service_account_token': None, 'containers': [{'args': ['import sys; sys.exit(1)'], 'command': ['python', '-c'], 'env': [{'name': 'DBT_DATABASE', 'value': 'secret', 'value_from': None}], 'env_from': None, 'image': 'python:3.6', 'image_pull_policy': 'IfNotPresent', 'lifecycle': None, 'liveness_probe': None, 'name': 'base', 'ports': None, 'readiness_probe': None, 'resources': {'l
[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-898612407 Thank you both for your respond and showing me a quick workaroung. I'll implement it and test it against our custom solution. We do something similar by override the execute() method and apply a kind of custom masking on the remote_pod object. Yes we could use kubernetes secrets for our use cases as well. I assume there are pros and cons for using secrets. To my knowledge it was not possible to create secrets 'on the fly' with an instance of the KubernetesPodOperator for just running the specific task and afterwards remove it. Even if you could, you still would generate a secret per task instance or have a logic that check if a secret already exist and so on. Yes we could simply create this secret once and then use it but it would be inflexible if we want to change some values here and there as we then always need to change the secret. At the moment (see example above) we use a V1EnvVar model instance and pass it to the KubernetesPodOperator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dondaum commented on issue #17604: Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events
dondaum commented on issue #17604: URL: https://github.com/apache/airflow/issues/17604#issuecomment-898580071 We run version v2.1.2. So I had a look again in the 'Secret Masker" feature as I had it already configured without any positive effect on the environment variable masking. But most likely our setup and configuration might be wrong. So let's give some context: We have a custom logging config that uses Azure blob strorage for remote logging where we have add the new filter on the remote task handler: ```python DEFAULT_LOGGING_CONFIG: Dict[str, Any] = { "version": 1, "disable_existing_loggers": False, "formatters": { "airflow": {"format": LOG_FORMAT}, "airflow_coloured": { "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, "class": COLORED_FORMATTER_CLASS if COLORED_LOG else "logging.Formatter", }, }, "filters": { "mask_secrets": { "()": "airflow.utils.log.secrets_masker.SecretsMasker", }, }, "handlers": { "console": { "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", "formatter": "airflow_coloured", "stream": "sys.stdout", "filters": ["mask_secrets"], }, "task": { "class": "airflow.utils.log.file_task_handler.FileTaskHandler", "formatter": "airflow", "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), "filename_template": FILENAME_TEMPLATE, "filters": ["mask_secrets"], }, "processor": { "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", # noqa: E501 "formatter": "airflow", "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), "filename_template": PROCESSOR_FILENAME_TEMPLATE, "filters": ["mask_secrets"], }, }, "loggers": { "airflow.processor": { "handlers": ["processor"], "level": LOG_LEVEL, "propagate": False, }, "airflow.task": { "handlers": ["task"], "level": LOG_LEVEL, "propagate": False, "filters": ["mask_secrets"], }, "flask_appbuilder": { "handler": ["console"], "level": FAB_LOG_LEVEL, "propagate": True, }, }, "root": { "handlers": ["console"], "level": LOG_LEVEL, "filters": ["mask_secrets"], }, } EXTRA_LOGGER_NAMES: str = conf.get( "logging", "EXTRA_LOGGER_NAMES", fallback=None ) if EXTRA_LOGGER_NAMES: new_loggers = { logger_name.strip(): { "handler": ["console"], "level": LOG_LEVEL, "propagate": True, } for logger_name in EXTRA_LOGGER_NAMES.split(",") } DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = { "handlers": { "processor_manager": { "class": "logging.handlers.RotatingFileHandler", "formatter": "airflow", "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, "mode": "a", "maxBytes": 104857600, # 100MB "backupCount": 5, } }, "loggers": { "airflow.processor_manager": { "handlers": ["processor_manager"], "level": LOG_LEVEL, "propagate": False, } }, } ... elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): REMOTE_BASE_LOG_FOLDER = TARGET_HOSTNAME WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { "task": { "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", # noqa: E501 "formatter": "airflow", "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, "wasb_container": "airflow-logs", "filename_template": FILENAME_TEMPLATE, "delete_local_copy": False, "filters": ["mask_secrets"], }, } ``` In our Airflow configuration we have (we are using helm charts): ` AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_PWD,DBT_WAREHOUSE" ` I just had a look in the Airflow doc to verify not to miss any other important config. I found AIRFLOW__CORE__HIDE_SENSITIVE_VAR_CONN_FIELDS but it is true by default and we do not change it.