This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new dc94afd7f8 Add option of sending DAG parser logs to stdout. (#25754) dc94afd7f8 is described below commit dc94afd7f816f248b21bb49cfc38088692c595d0 Author: James Timmins <ja...@astronomer.io> AuthorDate: Wed Sep 7 05:06:58 2022 -0500 Add option of sending DAG parser logs to stdout. (#25754) * Add option of sending DAG parser logs to stdout. * Make long config line into two lines. * Reverse order of tests. * Update tests with new changes. --- airflow/config_templates/airflow_local_settings.py | 15 +++++- airflow/config_templates/config.yml | 15 ++++++ airflow/config_templates/default_airflow.cfg | 6 +++ airflow/dag_processing/processor.py | 55 ++++++++++++---------- tests/dag_processing/test_processor.py | 48 ++++++++++++++++++- 5 files changed, 113 insertions(+), 26 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 16f225d38b..9e145a0b8b 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -37,6 +37,7 @@ LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper() FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').upper() LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT') +DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_FORMAT') LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( 'logging', 'LOG_FORMATTER_CLASS', fallback='airflow.utils.log.timezone_aware.TimezoneAware' @@ -48,6 +49,8 @@ COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG') COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 'COLORED_FORMATTER_CLASS') +DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET') + BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER') PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') @@ -75,6 +78,10 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = { 'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, 'class': COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, }, + 'source_processor': { + 'format': DAG_PROCESSOR_LOG_FORMAT, + 'class': LOG_FORMATTER_CLASS, + }, }, 'filters': { 'mask_secrets': { @@ -101,10 +108,16 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = { 'filename_template': PROCESSOR_FILENAME_TEMPLATE, 'filters': ['mask_secrets'], }, + 'processor_to_stdout': { + 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', + 'formatter': 'source_processor', + 'stream': 'sys.stdout', + 'filters': ['mask_secrets'], + }, }, 'loggers': { 'airflow.processor': { - 'handlers': ['processor'], + 'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'], 'level': LOG_LEVEL, 'propagate': False, }, diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 4537b62d6b..a6d19e7e2c 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -659,6 +659,21 @@ type: string example: ~ default: "%%(asctime)s %%(levelname)s - %%(message)s" + - name: dag_processor_log_target + description: Where to send dag parser logs. If "file", + logs are sent to log files defined by child_process_log_directory. + version_added: 2.4.0 + type: string + example: ~ + default: "file" + - name: dag_processor_log_format + description: | + Format of Dag Processor Log line + version_added: 2.3.4 + type: string + example: ~ + default: "[%%(asctime)s] [SOURCE:DAG_PROCESSOR] + {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s" - name: log_formatter_class description: ~ version_added: 2.3.4 diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 7cd116369e..2c88a759c3 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -366,6 +366,12 @@ colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatte # Format of Log line log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s + +# Where to send dag parser logs. If "file", logs are sent to log files defined by child_process_log_directory. +dag_processor_log_target = file + +# Format of Dag Processor Log line +dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware # Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index fa1eb46c29..dc6da4b052 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -143,31 +143,38 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): set_context(log, file_path) setproctitle(f"airflow scheduler - DagFileProcessor {file_path}") + + def _handle_dag_file_processing(): + # Re-configure the ORM engine as there are issues with multiple processes + # settings.configure_orm() + + # Change the thread name to differentiate log lines. This is + # really a separate process, but changing the name of the + # process doesn't work, so changing the thread name instead. + threading.current_thread().name = thread_name + + log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) + dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log) + result: Tuple[int, int] = dag_file_processor.process_file( + file_path=file_path, + pickle_dags=pickle_dags, + callback_requests=callback_requests, + ) + result_channel.send(result) + try: - # redirect stdout/stderr to log - with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr( - StreamLogWriter(log, logging.WARN) - ), Stats.timer() as timer: - # Re-configure the ORM engine as there are issues with multiple processes - settings.configure_orm() - - # Change the thread name to differentiate log lines. This is - # really a separate process, but changing the name of the - # process doesn't work, so changing the thread name instead. - threading.current_thread().name = thread_name - - log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) - dag_file_processor = DagFileProcessor( - dag_ids=dag_ids, - dag_directory=dag_directory, - log=log, - ) - result: Tuple[int, int] = dag_file_processor.process_file( - file_path=file_path, - pickle_dags=pickle_dags, - callback_requests=callback_requests, - ) - result_channel.send(result) + DAG_PROCESSOR_LOG_TARGET = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET') + if DAG_PROCESSOR_LOG_TARGET == "stdout": + with Stats.timer() as timer: + _handle_dag_file_processing() + else: + # The following line ensures that stdout goes to the same destination as the logs. If stdout + # gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This + # necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET. + with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr( + StreamLogWriter(log, logging.WARN) + ), Stats.timer() as timer: + _handle_dag_file_processing() log.info("Processing %s took %.3f seconds", file_path, timer.duration) except Exception: # Log exceptions through the logging framework. diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index b0e09d0417..a41a97bd5a 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -28,7 +28,7 @@ from airflow import settings from airflow.callbacks.callback_requests import TaskCallbackRequest from airflow.configuration import TEST_DAGS_FOLDER, conf from airflow.dag_processing.manager import DagFileProcessorAgent -from airflow.dag_processing.processor import DagFileProcessor +from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance @@ -788,6 +788,52 @@ class TestDagFileProcessor: assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename) session.rollback() + @conf_vars({("logging", "dag_processor_log_target"): "stdout"}) + @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock) + @mock.patch('airflow.dag_processing.processor.redirect_stdout') + def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file): + processor = DagFileProcessorProcess( + file_path='abc.txt', + pickle_dags=False, + dag_ids=[], + dag_directory=[], + callback_requests=[], + ) + processor._run_file_processor( + result_channel=MagicMock(), + parent_channel=MagicMock(), + file_path="fake_file_path", + pickle_dags=False, + dag_ids=[], + thread_name="fake_thread_name", + callback_requests=[], + dag_directory=[], + ) + mock_redirect_stdout_for_file.assert_not_called() + + @conf_vars({("logging", "dag_processor_log_target"): "file"}) + @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock) + @mock.patch('airflow.dag_processing.processor.redirect_stdout') + def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file): + processor = DagFileProcessorProcess( + file_path='abc.txt', + pickle_dags=False, + dag_ids=[], + dag_directory=[], + callback_requests=[], + ) + processor._run_file_processor( + result_channel=MagicMock(), + parent_channel=MagicMock(), + file_path="fake_file_path", + pickle_dags=False, + dag_ids=[], + thread_name="fake_thread_name", + callback_requests=[], + dag_directory=[], + ) + mock_redirect_stdout_for_file.assert_called_once() + class TestProcessorAgent: @pytest.fixture(autouse=True)