This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c8af92c6e  [Task] add UT for rotate log setting in worker task 
(#41923)
6c8af92c6e is described below

commit 6c8af92c6e91d5ccc0165187fb389334ff688189
Author: Huanjie Guo <[email protected]>
AuthorDate: Sun Sep 1 22:33:41 2024 +0800

     [Task] add UT for rotate log setting in worker task (#41923)
---
 .../advanced-logging-configuration.rst             |  4 +-
 tests/utils/test_log_handlers.py                   | 87 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 3 deletions(-)

diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
index 6339735664..2cfc44e725 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
@@ -176,9 +176,7 @@ Example of limiting the size of tasks:
           deepcopy(DEFAULT_LOGGING_CONFIG),
           {
               "handlers": {
-                  "task": {
-                      "max_bytes": 104857600,  # 100MB
-                  }
+                  "task": {"max_bytes": 104857600, "backup_count": 1}  # 100MB 
and keep 1 history rotate log.
               }
           },
       )
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 81108131d8..6235d854d4 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -30,6 +30,7 @@ from unittest.mock import patch
 import pendulum
 import pytest
 from kubernetes.client import models as k8s
+from pydantic.v1.utils import deep_update
 from requests.adapters import Response
 
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
@@ -245,6 +246,92 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
+    def test_file_task_handler_rotate_size_limit(self):
+        def reset_log_config(update_conf):
+            import logging.config
+
+            logging_config = DEFAULT_LOGGING_CONFIG
+            logging_config = deep_update(logging_config, update_conf)
+            logging.config.dictConfig(logging_config)
+
+        def task_callable(ti):
+            pass
+
+        max_bytes_size = 60000
+        update_conf = {"handlers": {"task": {"max_bytes": max_bytes_size, 
"backup_count": 1}}}
+        reset_log_config(update_conf)
+        dag = DAG("dag_for_testing_file_task_handler_rotate_size_limit", 
start_date=DEFAULT_DATE)
+        task = PythonOperator(
+            task_id="task_for_testing_file_log_handler_rotate_size_limit",
+            python_callable=task_callable,
+            dag=dag,
+        )
+        dagrun = dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
+        )
+        ti = TaskInstance(task=task, run_id=dagrun.run_id)
+
+        ti.try_number = 1
+        ti.state = State.RUNNING
+
+        logger = ti.log
+        ti.log.disabled = False
+
+        file_handler = next(
+            (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
+        )
+        assert file_handler is not None
+
+        set_context(logger, ti)
+        assert file_handler.handler is not None
+        # We expect set_context generates a file locally, this is the first 
log file
+        # in this test, it should generate 2 when it finishes.
+        log_filename = file_handler.handler.baseFilename
+        assert os.path.isfile(log_filename)
+        assert log_filename.endswith("1.log"), log_filename
+
+        # mock to generate 2000 lines of log, the total size is larger than 
max_bytes_size
+        for i in range(1, 2000):
+            logger.info("this is a Test. %s", i)
+
+        # this is the rotate log file
+        log_rotate_1_name = log_filename + ".1"
+        assert os.path.isfile(log_rotate_1_name)
+
+        current_file_size = os.path.getsize(log_filename)
+        rotate_file_1_size = os.path.getsize(log_rotate_1_name)
+        assert rotate_file_1_size > max_bytes_size * 0.9
+        assert rotate_file_1_size < max_bytes_size
+        assert current_file_size < max_bytes_size
+
+        # Return value of read must be a tuple of list and list.
+        logs, metadatas = file_handler.read(ti)
+
+        # the log content should have the filename of both current log file 
and rotate log file.
+        find_current_log = False
+        find_rotate_log_1 = False
+        for log in logs:
+            if log_filename in str(log):
+                find_current_log = True
+            if log_rotate_1_name in str(log):
+                find_rotate_log_1 = True
+        assert find_current_log is True
+        assert find_rotate_log_1 is True
+
+        assert isinstance(logs, list)
+        # Logs for running tasks should show up too.
+        assert isinstance(logs, list)
+        assert isinstance(metadatas, list)
+        assert len(logs) == len(metadatas)
+        assert isinstance(metadatas[0], dict)
+
+        # Remove the two generated tmp log files.
+        os.remove(log_filename)
+        os.remove(log_rotate_1_name)
+
     
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
     def test__read_when_local(self, mock_read_local, create_task_instance):
         """

Reply via email to