This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 369e637 Add query count test for LocalTaskJob (#8922) 369e637 is described below commit 369e6377b4c9a267c85c82211fd72282092b6fa8 Author: Tomek Urbaszek <turbas...@gmail.com> AuthorDate: Thu May 28 07:24:46 2020 +0200 Add query count test for LocalTaskJob (#8922) * Add query count test for LocalTaskJob * fixup! Add query count test for LocalTaskJob --- tests/jobs/test_local_task_job.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 25a181a..9a706ed 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -20,6 +20,8 @@ import multiprocessing import os import time import unittest +import uuid +from unittest import mock import pytest from mock import patch @@ -38,6 +40,7 @@ from airflow.utils.net import get_hostname from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timeout import timeout +from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_runs from tests.test_utils.mock_executor import MockExecutor @@ -406,3 +409,30 @@ class TestLocalTaskJob(unittest.TestCase): self.assertTrue(data['called']) process.join(timeout=10) self.assertFalse(process.is_alive()) + + +@pytest.fixture() +def clean_db_helper(): + yield + clear_db_runs() + + +@pytest.mark.usefixtures("clean_db_helper") +class TestLocalTaskJobPerformance: + @pytest.mark.parametrize("return_codes", [[0], 9 * [None] + [0]]) # type: ignore + @mock.patch("airflow.jobs.local_task_job.get_task_runner") + def test_number_of_queries_single_loop(self, mock_get_task_runner, return_codes): + unique_prefix = str(uuid.uuid4()) + dag = DAG(dag_id=f'{unique_prefix}_test_number_of_queries', start_date=DEFAULT_DATE) + task = DummyOperator(task_id='test_state_succeeded1', dag=dag) + + dag.clear() + dag.create_dagrun(run_id=unique_prefix, state=State.NONE) + + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + + mock_get_task_runner.return_value.return_code.side_effects = return_codes + + job = LocalTaskJob(task_instance=ti, executor=MockExecutor()) + with assert_queries_count(13): + job.run()