This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f112cfbc1 Improve Code Coverage for Base Executor (#35260)
6f112cfbc1 is described below

commit 6f112cfbc1091ccde518aed2a6c99f65a75e9dc2
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Mon Oct 30 20:07:57 2023 +0800

    Improve Code Coverage for Base Executor (#35260)
---
 scripts/cov/core_coverage.py          |   1 -
 tests/executors/test_base_executor.py | 126 ++++++++++++++++++++++++++++++++++
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/scripts/cov/core_coverage.py b/scripts/cov/core_coverage.py
index ba77c35fd1..38d001d2db 100644
--- a/scripts/cov/core_coverage.py
+++ b/scripts/cov/core_coverage.py
@@ -35,7 +35,6 @@ source_files = [
 
 files_not_fully_covered = [
     # executors
-    "airflow/executors/base_executor.py",
     "airflow/executors/debug_executor.py",
     "airflow/executors/executor_loader.py",
     "airflow/executors/local_executor.py",
diff --git a/tests/executors/test_base_executor.py 
b/tests/executors/test_base_executor.py
index 7c21949b53..c990d88726 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+import logging
+import sys
 from datetime import timedelta
 from unittest import mock
 
@@ -24,6 +26,9 @@ import pendulum
 import pytest
 import time_machine
 
+from airflow.callbacks.callback_requests import CallbackRequest
+from airflow.cli.cli_config import DefaultHelpParser, GroupCommand
+from airflow.cli.cli_parser import AirflowHelpFormatter
 from airflow.executors.base_executor import BaseExecutor, 
RunningRetryAttemptType
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -51,6 +56,11 @@ def test_is_production_default_value():
     assert BaseExecutor.is_production
 
 
+def test_infinite_slotspool():
+    executor = BaseExecutor(0)
+    assert executor.slots_available == sys.maxsize
+
+
 def test_get_task_log():
     executor = BaseExecutor()
     ti = TaskInstance(task=BaseOperator(task_id="dummy"))
@@ -83,6 +93,24 @@ def test_get_event_buffer():
     assert len(executor.event_buffer) == 0
 
 
+def test_fail_and_success():
+    executor = BaseExecutor()
+
+    date = timezone.utcnow()
+    try_number = 1
+    success_state = State.SUCCESS
+    fail_state = State.FAILED
+    key1 = TaskInstanceKey("my_dag1", "my_task1", date, try_number)
+    key2 = TaskInstanceKey("my_dag2", "my_task1", date, try_number)
+    key3 = TaskInstanceKey("my_dag2", "my_task2", date, try_number)
+    executor.fail(key1, fail_state)
+    executor.fail(key2, fail_state)
+    executor.success(key3, success_state)
+
+    assert len(executor.running) == 0
+    assert len(executor.get_event_buffer()) == 3
+
+
 @mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
 @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
 @mock.patch("airflow.executors.base_executor.Stats.gauge")
@@ -99,6 +127,22 @@ def test_gauge_executor_metrics(mock_stats_gauge, 
mock_trigger_tasks, mock_sync)
     mock_stats_gauge.assert_has_calls(calls)
 
 
+@mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
+@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
+@mock.patch("airflow.executors.base_executor.Stats.gauge")
+def test_gauge_executor_with_infinite_pool_metrics(mock_stats_gauge, 
mock_trigger_tasks, mock_sync):
+    executor = BaseExecutor(0)
+    executor.heartbeat()
+    calls = [
+        mock.call("executor.open_slots", value=mock.ANY, tags={"status": 
"open", "name": "BaseExecutor"}),
+        mock.call("executor.queued_tasks", value=mock.ANY, tags={"status": 
"queued", "name": "BaseExecutor"}),
+        mock.call(
+            "executor.running_tasks", value=mock.ANY, tags={"status": 
"running", "name": "BaseExecutor"}
+        ),
+    ]
+    mock_stats_gauge.assert_has_calls(calls)
+
+
 def setup_dagrun(dag_maker):
     date = timezone.utcnow()
     start_date = date - timedelta(days=2)
@@ -200,10 +244,92 @@ def test_trigger_running_tasks(can_try_mock, dag_maker, 
can_try_num, change_stat
 def test_validate_airflow_tasks_run_command(dag_maker):
     dagrun = setup_dagrun(dag_maker)
     tis = dagrun.task_instances
+    print(f"command: {tis[0].command_as_list()}")
     dag_id, task_id = 
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+    print(f"dag_id: {dag_id}, task_id: {task_id}")
     assert dag_id == dagrun.dag_id and task_id == tis[0].task_id
 
 
+@mock.patch(
+    "airflow.models.taskinstance.TaskInstance.generate_command",
+    return_value=["airflow", "tasks", "run", "--test_dag", "--test_task"],
+)
+def 
test_validate_airflow_tasks_run_command_with_complete_forloop(generate_command_mock,
 dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
+    dag_id, task_id = 
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+    assert dag_id is None and task_id is None
+
+
+@mock.patch(
+    "airflow.models.taskinstance.TaskInstance.generate_command", 
return_value=["airflow", "task", "run"]
+)
+def test_invalid_airflow_tasks_run_command(generate_command_mock, dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
+    with pytest.raises(ValueError):
+        
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+
+
+@mock.patch(
+    "airflow.models.taskinstance.TaskInstance.generate_command", 
return_value=["airflow", "tasks", "run"]
+)
+def test_empty_airflow_tasks_run_command(generate_command_mock, dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
+    dag_id, task_id = 
BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list())
+    assert dag_id is None, task_id is None
+
+
+def test_deprecate_validate_api(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
+    with pytest.warns(DeprecationWarning):
+        BaseExecutor.validate_command(tis[0].command_as_list())
+
+
+def test_debug_dump(caplog):
+    executor = BaseExecutor()
+    with caplog.at_level(logging.INFO):
+        executor.debug_dump()
+    assert "executor.queued" in caplog.text
+    assert "executor.running" in caplog.text
+    assert "executor.event_buffer" in caplog.text
+
+
+def test_base_executor_cannot_send_callback():
+    cbr = CallbackRequest("some_file_path_for_callback")
+    executor = BaseExecutor()
+    with pytest.raises(ValueError):
+        executor.send_callback(cbr)
+
+
+def test_parser_and_formatter_class():
+    executor = BaseExecutor()
+    parser = executor._get_parser()
+    assert isinstance(parser, DefaultHelpParser)
+    assert parser.formatter_class is AirflowHelpFormatter
+
+
+@mock.patch("airflow.cli.cli_parser._add_command")
+@mock.patch(
+    "airflow.executors.base_executor.BaseExecutor.get_cli_commands",
+    return_value=[
+        GroupCommand(
+            name="some_name",
+            help="some_help",
+            subcommands=["A", "B", "C"],
+            description="some_description",
+            epilog="some_epilog",
+        )
+    ],
+)
+def test_parser_add_command(mock_add_command, mock_get_cli_command):
+    executor = BaseExecutor()
+    executor._get_parser()
+    mock_add_command.assert_called_once()
+
+
 @pytest.mark.parametrize("loop_duration, total_tries", [(0.5, 12), (1.0, 7), 
(1.7, 4), (10, 2)])
 def test_running_retry_attempt_type(loop_duration, total_tries):
     """

Reply via email to