This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a0c2071834 Partially enable PT012 rule (#38219)
a0c2071834 is described below

commit a0c2071834506f2342e9ac88c65003510a5efde8
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Mar 18 22:03:54 2024 +0400

    Partially enable PT012 rule (#38219)
    
    * Partially enable PT012 rule
    
    * Fixup 'tests/www/test_app.py' tests
    
    * Fixup 
'tests/decorators/test_bash.py::TestBashDecorator::test_cwd_is_file' tests
---
 pyproject.toml                             | 77 +++++++++++++++++++++++-
 tests/cli/commands/test_task_command.py    |  3 +-
 tests/cli/test_cli_parser.py               |  7 ++-
 tests/core/test_stats.py                   | 13 ++--
 tests/dag_processing/test_job_runner.py    | 32 +++++-----
 tests/decorators/test_bash.py              | 48 ++++++++-------
 tests/decorators/test_setup_teardown.py    |  8 ++-
 tests/jobs/test_backfill_job.py            |  6 +-
 tests/jobs/test_triggerer_job.py           |  2 +-
 tests/models/test_baseoperator.py          | 49 +++++++--------
 tests/models/test_dag.py                   | 16 +++--
 tests/models/test_param.py                 |  4 +-
 tests/models/test_taskinstance.py          |  2 +-
 tests/models/test_taskmixin.py             |  1 -
 tests/operators/test_python.py             |  9 ++-
 tests/operators/test_subdag_operator.py    |  4 +-
 tests/operators/test_trigger_dagrun.py     |  7 +--
 tests/security/test_kerberos.py            | 10 ++--
 tests/sensors/test_external_task_sensor.py | 32 +++++-----
 tests/sensors/test_filesystem.py           |  9 ++-
 tests/serialization/test_serde.py          |  6 +-
 tests/utils/test_sqlalchemy.py             |  4 +-
 tests/utils/test_task_group.py             | 42 +++++++------
 tests/www/test_app.py                      | 96 +++++++++++-------------------
 24 files changed, 263 insertions(+), 224 deletions(-)

diff --git a/pyproject.toml b/pyproject.toml
index 97aceac2af..6114cf93d5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1360,7 +1360,6 @@ ignore = [
     "PT007", # Wrong type of values in @pytest.mark.parametrize
     "PT008", # Use return_value= instead of patching with lambda
     "PT011", # pytest.raises() is too broad, set the match parameter
-    "PT012", # [controversial rule] pytest.raises() block should contain a 
single simple statement.
     "PT018", # assertion should be broken down into multiple parts
     "PT019", # fixture without value is injected as parameter, use 
@pytest.mark.usefixtures instead
 ]
@@ -1405,7 +1404,7 @@ combine-as-imports = true
 "dev/breeze/tests/*" = ["TID253", "S101"]
 "tests/*" = ["D", "TID253", "S101"]
 "docker_tests/*" = ["D", "TID253", "S101"]
-"kubernetes_tests/*" = ["D", "TID253", "S101"]
+"kubernetes_tests/*" = ["D", "TID253", "S101", "PT012"]
 "helm_tests/*" = ["D", "TID253", "S101"]
 
 # All of the modules which have an extra license header (i.e. that we copy 
from another project) need to
@@ -1417,7 +1416,7 @@ combine-as-imports = true
 "tests/providers/elasticsearch/log/elasticmock/__init__.py" = ["E402"]
 "tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py" = 
["E402"]
 "tests/providers/openai/hooks/test_openai.py" = ["E402"]
-"tests/providers/openai/operators/test_openai.py" = ["E402"]
+"tests/providers/openai/operators/test_openai.py" = ["E402", "PT012"]
 "tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"]
 "tests/providers/qdrant/operators/test_qdrant.py" = ["E402"]
 "tests/providers/snowflake/operators/test_snowflake_sql.py" = ["E402"]
@@ -1488,6 +1487,78 @@ combine-as-imports = true
 "airflow/providers/smtp/hooks/smtp.py" = ["D105"]
 "airflow/providers/tableau/hooks/tableau.py" = ["D105"]
 
+# All the test modules which do not follow PT012 yet
+"tests/providers/amazon/aws/hooks/test_base_aws.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_datasync.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_eks.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_redshift_data.py" = ["PT012"]
+"tests/providers/amazon/aws/hooks/test_s3.py" = ["PT012"]
+"tests/providers/amazon/aws/operators/test_emr_serverless.py" = ["PT012"]
+"tests/providers/amazon/aws/operators/test_redshift_data.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_glacier.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_glue.py" = ["PT012"]
+"tests/providers/amazon/aws/sensors/test_lambda_function.py" = ["PT012"]
+"tests/providers/amazon/aws/system/utils/test_helpers.py" = ["PT012"]
+"tests/providers/amazon/aws/transfers/test_redshift_to_s3.py" = ["PT012"]
+"tests/providers/amazon/aws/triggers/test_ecs.py" = ["PT012"]
+"tests/providers/amazon/aws/waiters/test_neptune.py" = ["PT012"]
+"tests/providers/apache/beam/hooks/test_beam.py" = ["PT012"]
+"tests/providers/apache/hive/hooks/test_hive.py" = ["PT012"]
+"tests/providers/apache/hive/sensors/test_named_hive_partition.py" = ["PT012"]
+"tests/providers/apache/spark/hooks/test_spark_sql.py" = ["PT012"]
+"tests/providers/celery/sensors/test_celery_queue.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py" = 
["PT012"]
+"tests/providers/cncf/kubernetes/hooks/test_kubernetes.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/operators/test_pod.py" = ["PT012"]
+"tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py" = 
["PT012"]
+"tests/providers/cncf/kubernetes/utils/test_pod_manager.py" = ["PT012"]
+"tests/providers/common/sql/operators/test_sql.py" = ["PT012"]
+"tests/providers/databricks/hooks/test_databricks.py" = ["PT012"]
+"tests/providers/databricks/operators/test_databricks.py" = ["PT012"]
+"tests/providers/databricks/operators/test_databricks_repos.py" = ["PT012"]
+"tests/providers/databricks/sensors/test_databricks_partition.py" = ["PT012"]
+"tests/providers/datadog/sensors/test_datadog.py" = ["PT012"]
+"tests/providers/dbt/cloud/operators/test_dbt.py" = ["PT012"]
+"tests/providers/fab/auth_manager/cli_commands/test_user_command.py" = 
["PT012"]
+"tests/providers/ftp/operators/test_ftp.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_bigquery.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py" = 
["PT012"]
+"tests/providers/google/cloud/hooks/test_dataflow.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_dataprep.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_kubernetes_engine.py" = ["PT012"]
+"tests/providers/google/cloud/hooks/test_pubsub.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_bigtable.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_cloud_sql.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py"
 = ["PT012"]
+"tests/providers/google/cloud/operators/test_compute.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_datafusion.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_dataproc.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_functions.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_kubernetes_engine.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_mlengine.py" = ["PT012"]
+"tests/providers/google/cloud/operators/test_spanner.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_datafusion.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_dataproc.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_gcs.py" = ["PT012"]
+"tests/providers/google/cloud/sensors/test_pubsub.py" = ["PT012"]
+"tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py" = ["PT012"]
+"tests/providers/google/cloud/utils/test_credentials_provider.py" = ["PT012"]
+"tests/providers/google/common/hooks/test_base_google.py" = ["PT012"]
+"tests/providers/jenkins/sensors/test_jenkins.py" = ["PT012"]
+"tests/providers/microsoft/azure/hooks/test_data_factory.py" = ["PT012"]
+"tests/providers/microsoft/azure/hooks/test_wasb.py" = ["PT012"]
+"tests/providers/microsoft/psrp/hooks/test_psrp.py" = ["PT012"]
+"tests/providers/oracle/hooks/test_oracle.py" = ["PT012"]
+"tests/providers/papermill/operators/test_papermill.py" = ["PT012"]
+"tests/providers/sftp/hooks/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/operators/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/sensors/test_sftp.py" = ["PT012"]
+"tests/providers/sftp/triggers/test_sftp.py" = ["PT012"]
+"tests/providers/ssh/hooks/test_ssh.py" = ["PT012"]
+"tests/providers/ssh/operators/test_ssh.py" = ["PT012"]
+"tests/providers/telegram/hooks/test_telegram.py" = ["PT012"]
+"tests/providers/telegram/operators/test_telegram.py" = ["PT012"]
+
 [tool.ruff.lint.flake8-tidy-imports]
 # Ban certain modules from being imported at module level, instead requiring
 # that they're imported lazily (e.g., within a function definition).
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index 24c6c950d2..d345075213 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -619,14 +619,13 @@ class TestCliTasks:
         task_states_for_dag_run should return an AirflowException when invalid 
dag id is passed
         """
         with pytest.raises(DagRunNotFound):
-            default_date2 = timezone.datetime(2016, 1, 9)
             task_command.task_states_for_dag_run(
                 self.parser.parse_args(
                     [
                         "tasks",
                         "states-for-dag-run",
                         "not_exists_dag",
-                        default_date2.isoformat(),
+                        timezone.datetime(2016, 1, 9).isoformat(),
                         "--output",
                         "json",
                     ]
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index f8c1d53951..d105b01614 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -155,9 +155,9 @@ class TestCli:
                 args=[],
             )
         ]
+        reload(executor_loader)
         with pytest.raises(CliConflictError, match="test_command"):
             # force re-evaluation of cli commands (done in top level code)
-            reload(executor_loader)
             reload(cli_parser)
 
     def test_falsy_default_value(self):
@@ -229,6 +229,7 @@ class TestCli:
 
         with pytest.raises(argparse.ArgumentTypeError):
             cli_config.positive_int(allow_zero=False)("0")
+        with pytest.raises(argparse.ArgumentTypeError):
             cli_config.positive_int(allow_zero=True)("-1")
 
     @pytest.mark.parametrize(
@@ -285,8 +286,8 @@ class TestCli:
     def 
test_non_existing_directory_raises_when_metavar_is_dir_for_db_export_cleaned(self):
         """Test that the error message is correct when the directory does not 
exist."""
         with contextlib.redirect_stderr(StringIO()) as stderr:
+            parser = cli_parser.get_parser()
             with pytest.raises(SystemExit):
-                parser = cli_parser.get_parser()
                 parser.parse_args(["db", "export-archived", "--output-path", 
"/non/existing/directory"])
             error_msg = stderr.getvalue()
 
@@ -302,8 +303,8 @@ class TestCli:
     ):
         """Test that invalid choice raises for export-format in db 
export-cleaned command."""
         with contextlib.redirect_stderr(StringIO()) as stderr:
+            parser = cli_parser.get_parser()
             with pytest.raises(SystemExit):
-                parser = cli_parser.get_parser()
                 parser.parse_args(
                     ["db", "export-archived", "--export-format", 
export_format, "--output-path", "mydir"]
                 )
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index ddd69ca32a..2c167d5d1e 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -124,15 +124,14 @@ class TestStats:
                 ("metrics", "statsd_on"): "True",
                 ("metrics", "statsd_custom_client_path"): 
f"{__name__}.InvalidCustomStatsd",
             }
-        ), pytest.raises(
-            AirflowConfigException,
-            match=re.escape(
-                "Your custom StatsD client must extend the statsd."
-                "StatsClient in order to ensure backwards compatibility."
-            ),
         ):
             importlib.reload(airflow.stats)
-            airflow.stats.Stats.incr("empty_key")
+            error_message = re.escape(
+                "Your custom StatsD client must extend the statsd."
+                "StatsClient in order to ensure backwards compatibility."
+            )
+            with pytest.raises(AirflowConfigException, match=error_message):
+                airflow.stats.Stats.incr("empty_key")
         importlib.reload(airflow.stats)
 
     def test_load_allow_list_validator(self):
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index b7b24bc1b2..d77f7692d3 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -1461,17 +1461,17 @@ class TestDagFileProcessorAgent:
         assert os.path.isfile(log_file_loc)
 
     def test_single_parsing_loop_no_parent_signal_conn(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._process = Mock()
+        processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._process = Mock()
-            processor_agent._parent_signal_conn = None
             processor_agent.run_single_parsing_loop()
 
     def test_single_parsing_loop_no_process(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._parent_signal_conn = Mock()
-            processor_agent._process = None
             processor_agent.run_single_parsing_loop()
 
     def test_single_parsing_loop_process_isnt_alive(self):
@@ -1498,15 +1498,15 @@ class TestDagFileProcessorAgent:
         assert retval == processor_agent._parent_signal_conn
 
     def test_get_callbacks_pipe_no_parent_signal_conn(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._parent_signal_conn = None
             processor_agent.get_callbacks_pipe()
 
     def test_wait_until_finished_no_parent_signal_conn(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._parent_signal_conn = None
             processor_agent.wait_until_finished()
 
     def test_wait_until_finished_poll_eof_error(self):
@@ -1519,9 +1519,9 @@ class TestDagFileProcessorAgent:
         assert ret_val is None
 
     def test_heartbeat_no_parent_signal_conn(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._parent_signal_conn = None
             processor_agent.heartbeat()
 
     def test_heartbeat_poll_eof_error(self):
@@ -1553,15 +1553,15 @@ class TestDagFileProcessorAgent:
             processor_agent._process_message.assert_called_with("testelem")
 
     def test_process_message_invalid_type(self):
+        message = "xyz"
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
         with pytest.raises(RuntimeError, match="Unexpected message received of 
type str"):
-            message = "xyz"
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
             processor_agent._process_message(message)
 
     def test_heartbeat_manager(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
-            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
-            processor_agent._parent_signal_conn = None
             processor_agent._heartbeat_manager()
 
     @mock.patch("airflow.utils.process_utils.reap_process_group")
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index 612e6a3327..1f2eb822d7 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -339,14 +339,15 @@ class TestBashDecorator:
 
         assert bash_task.operator.bash_command == NOTSET
 
+        dr = self.dag_maker.create_dagrun()
+        ti = dr.task_instances[0]
         with pytest.raises(AirflowException, match=f"Can not find the cwd: 
{cwd_path}"):
-            ti, _ = self.execute_task(bash_task)
-
-            self.validate_bash_command_rtif(ti, "echo")
+            ti.run()
+        assert ti.task.bash_command == "echo"
 
     def test_cwd_is_file(self, tmp_path):
         """Verify task failure for user-defined working directory that is 
actually a file."""
-        cwd_file = tmp_path / "test_file.sh"
+        cwd_file = tmp_path / "testfile.var.env"
         cwd_file.touch()
 
         with self.dag:
@@ -359,28 +360,32 @@ class TestBashDecorator:
 
         assert bash_task.operator.bash_command == NOTSET
 
+        dr = self.dag_maker.create_dagrun()
+        ti = dr.task_instances[0]
         with pytest.raises(AirflowException, match=f"The cwd {cwd_file} must 
be a directory"):
-            ti, _ = self.execute_task(bash_task)
-
-            self.validate_bash_command_rtif(ti, "echo")
+            ti.run()
+        assert ti.task.bash_command == "echo"
 
     def test_command_not_found(self):
         """Fail task if executed command is not found on path."""
-        with pytest.raises(
-            AirflowException, match="Bash command failed\\. The command 
returned a non-zero exit code 127\\."
-        ):
-            with self.dag:
 
-                @task.bash
-                def bash():
-                    return "set -e; something-that-isnt-on-path"
+        with self.dag:
 
-                bash_task = bash()
+            @task.bash
+            def bash():
+                return "set -e; something-that-isnt-on-path"
 
-            assert bash_task.operator.bash_command == NOTSET
+            bash_task = bash()
 
-            ti, _ = self.execute_task(bash_task)
-            self.validate_bash_command_rtif(ti, "set -e; 
something-that-isnt-on-path")
+        assert bash_task.operator.bash_command == NOTSET
+
+        dr = self.dag_maker.create_dagrun()
+        ti = dr.task_instances[0]
+        with pytest.raises(
+            AirflowException, match="Bash command failed\\. The command 
returned a non-zero exit code 127\\."
+        ):
+            ti.run()
+        assert ti.task.bash_command == "set -e; something-that-isnt-on-path"
 
     def test_multiple_outputs_true(self):
         """Verify setting `multiple_outputs` for a @task.bash-decorated 
function is ignored."""
@@ -474,7 +479,8 @@ class TestBashDecorator:
 
         assert bash_task.operator.bash_command == NOTSET
 
+        dr = self.dag_maker.create_dagrun()
+        ti = dr.task_instances[0]
         with pytest.raises(AirflowException):
-            ti, _ = self.execute_task(bash_task)
-
-            self.validate_bash_command_rtif(ti, f"{DEFAULT_DATE.date()}; exit 
1;")
+            ti.run()
+        assert ti.task.bash_command == f"{DEFAULT_DATE.date()}; exit 1;"
diff --git a/tests/decorators/test_setup_teardown.py 
b/tests/decorators/test_setup_teardown.py
index 80694f47cb..13451ba379 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -126,7 +126,7 @@ class TestSetupTearDownTask:
 
     def test_setup_taskgroup_decorator(self, dag_maker):
         with dag_maker():
-            with pytest.raises(
+            with pytest.raises(  # noqa: PT012, check decorators required more 
than one line
                 expected_exception=AirflowException,
                 match="Task groups cannot be marked as setup or teardown.",
             ):
@@ -144,7 +144,7 @@ class TestSetupTearDownTask:
 
     def test_teardown_taskgroup_decorator(self, dag_maker):
         with dag_maker():
-            with pytest.raises(
+            with pytest.raises(  # noqa: PT012, check decorators required more 
than one line
                 expected_exception=AirflowException,
                 match="Task groups cannot be marked as setup or teardown.",
             ):
@@ -989,7 +989,9 @@ class TestSetupTearDownTask:
         def teardowntask():
             print("teardown")
 
-        with pytest.raises(ValueError, match="All tasks in the list must be 
either setup or teardown tasks"):
+        with pytest.raises(  # noqa: PT012, check decorators required more 
than one line
+            ValueError, match="All tasks in the list must be either setup or 
teardown tasks"
+        ):
             with dag_maker():
                 with setuptask() << context_wrapper([teardowntask(), 
setuptask2()]):
                     mytask()
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 18f6a47ce0..ad80166147 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1046,10 +1046,10 @@ class TestBackfillJob:
 
         # raises backwards
         expected_msg = "You cannot backfill backwards because one or more 
tasks depend_on_past: test_dop_task"
+        executor = MockExecutor()
+        job = Job(executor=executor)
+        job_runner = BackfillJobRunner(job=job, dag=dag, run_backwards=True, 
**kwargs)
         with pytest.raises(AirflowException, match=expected_msg):
-            executor = MockExecutor()
-            job = Job(executor=executor)
-            job_runner = BackfillJobRunner(job=job, dag=dag, 
run_backwards=True, **kwargs)
             run_job(job=job, execute_callable=job_runner._execute)
 
     def test_cli_receives_delay_arg(self):
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 1cc69062e4..7ce4f94590 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -213,8 +213,8 @@ def test_capacity_decode():
         4 / 2,  # Resolves to a float, in addition to being just plain weird
     ]
     for input_str in variants:
+        job = Job()
         with pytest.raises(ValueError):
-            job = Job()
             TriggererJobRunner(job=job, capacity=input_str)
 
 
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index 989e772074..8513d2ebb0 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -310,43 +310,38 @@ class TestBaseOperator:
         assert result == expected_output
 
     def test_mapped_dag_slas_disabled_classic(self):
-        with pytest.raises(AirflowException, match="SLAs are unsupported with 
mapped tasks"):
-            with DAG(
-                "test-dag", start_date=DEFAULT_DATE, 
default_args=dict(sla=timedelta(minutes=30))
-            ) as dag:
+        class MyOp(BaseOperator):
+            def __init__(self, x, **kwargs):
+                self.x = x
+                super().__init__(**kwargs)
 
-                @dag.task
-                def get_values():
-                    return [0, 1, 2]
+            def execute(self, context):
+                print(self.x)
 
-                task1 = get_values()
+        with DAG("test-dag", start_date=DEFAULT_DATE, 
default_args=dict(sla=timedelta(minutes=30))) as dag:
 
-                class MyOp(BaseOperator):
-                    def __init__(self, x, **kwargs):
-                        self.x = x
-                        super().__init__(**kwargs)
-
-                    def execute(self, context):
-                        print(self.x)
+            @dag.task
+            def get_values():
+                return [0, 1, 2]
 
+            task1 = get_values()
+            with pytest.raises(AirflowException, match="SLAs are unsupported 
with mapped tasks"):
                 MyOp.partial(task_id="hi").expand(x=task1)
 
     def test_mapped_dag_slas_disabled_taskflow(self):
-        with pytest.raises(AirflowException, match="SLAs are unsupported with 
mapped tasks"):
-            with DAG(
-                "test-dag", start_date=DEFAULT_DATE, 
default_args=dict(sla=timedelta(minutes=30))
-            ) as dag:
+        with DAG("test-dag", start_date=DEFAULT_DATE, 
default_args=dict(sla=timedelta(minutes=30))) as dag:
 
-                @dag.task
-                def get_values():
-                    return [0, 1, 2]
+            @dag.task
+            def get_values():
+                return [0, 1, 2]
 
-                task1 = get_values()
+            task1 = get_values()
 
-                @dag.task
-                def print_val(x):
-                    print(x)
+            @dag.task
+            def print_val(x):
+                print(x)
 
+            with pytest.raises(AirflowException, match="SLAs are unsupported 
with mapped tasks"):
                 print_val.expand(x=task1)
 
     @pytest.mark.db_test
@@ -749,8 +744,8 @@ class TestBaseOperator:
         assert op1 in op2.upstream_list
 
     def test_set_xcomargs_dependencies_error_when_outside_dag(self):
+        op1 = BaseOperator(task_id="op1")
         with pytest.raises(AirflowException):
-            op1 = BaseOperator(task_id="op1")
             MockOperator(task_id="op2", arg1=op1.output)
 
     def test_invalid_trigger_rule(self):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 27a4be3ce6..ba60315212 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1533,21 +1533,19 @@ class TestDag:
 
     def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
         """Verify tasks with Duplicate task_id raises error"""
-        with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has 
already been added to the DAG"):
-            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
-                op1 = EmptyOperator(task_id="t1")
-                op2 = BashOperator(task_id="t1", bash_command="sleep 1")
-                op1 >> op2
+        with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+            op1 = EmptyOperator(task_id="t1")
+            with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has 
already been added to the DAG"):
+                BashOperator(task_id="t1", bash_command="sleep 1")
 
         assert dag.task_dict == {op1.task_id: op1}
 
     def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
         """Verify tasks with Duplicate task_id raises error"""
+        dag = DAG("test_dag", start_date=DEFAULT_DATE)
+        op1 = EmptyOperator(task_id="t1", dag=dag)
         with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has 
already been added to the DAG"):
-            dag = DAG("test_dag", start_date=DEFAULT_DATE)
-            op1 = EmptyOperator(task_id="t1", dag=dag)
-            op2 = EmptyOperator(task_id="t1", dag=dag)
-            op1 >> op2
+            EmptyOperator(task_id="t1", dag=dag)
 
         assert dag.task_dict == {op1.task_id: op1}
 
diff --git a/tests/models/test_param.py b/tests/models/test_param.py
index 24a910f01e..89421f0dc2 100644
--- a/tests/models/test_param.py
+++ b/tests/models/test_param.py
@@ -171,8 +171,8 @@ class TestParam:
         p = Param(1.2, type="number")
         assert p.resolve() == 1.2
 
+        p = Param("42", type="number")
         with pytest.raises(ParamValidationError):
-            p = Param("42", type="number")
             p.resolve()
 
     def test_list_param(self):
@@ -212,8 +212,8 @@ class TestParam:
         p = S3Param("s3://my_bucket/my_path")
         assert p.resolve() == "s3://my_bucket/my_path"
 
+        p = S3Param("file://not_valid/s3_path")
         with pytest.raises(ParamValidationError):
-            p = S3Param("file://not_valid/s3_path")
             p.resolve()
 
     def test_value_saved(self):
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0d9d4df0f4..457d311923 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -402,8 +402,8 @@ class TestTaskInstance:
         test that try to create a task with pool_slots less than 1
         """
 
+        dag = DAG(dag_id="test_run_pooling_task")
         with pytest.raises(ValueError, match="pool slots .* cannot be less 
than 1"):
-            dag = DAG(dag_id="test_run_pooling_task")
             EmptyOperator(
                 task_id="test_run_pooling_task_op",
                 dag=dag,
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index 070483cfef..903f215ce7 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -166,7 +166,6 @@ def test_cannot_be_both_setup_and_teardown(dag_maker, 
type_):
                 ValueError, match=f"Cannot mark task 's1' as {second}; task is 
already a {first}."
             ):
                 getattr(s1, f"as_{second}")()
-                s1.as_teardown()
 
 
 def test_cannot_set_on_failure_fail_dagrun_unless_teardown_classic(dag_maker):
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index a6bf98ee16..3e6f5a8b9a 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -757,9 +757,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         def f():
             raise Exception("Custom error message")
 
-        with pytest.raises(AirflowException) as e:
+        with pytest.raises(AirflowException, match="Custom error message"):
             self.run_as_task(f)
-            assert "Custom error message" in str(e)
 
     def test_string_args(self):
         def f():
@@ -913,11 +912,11 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
     def test_virtualenv_not_installed(self, importlib_mock, which_mock):
         which_mock.return_value = None
         importlib_mock.util.find_spec.return_value = None
-        with pytest.raises(AirflowException, match="requires virtualenv"):
 
-            def f():
-                pass
+        def f():
+            pass
 
+        with pytest.raises(AirflowException, match="requires virtualenv"):
             self.run_as_task(f)
 
     def test_add_dill(self):
diff --git a/tests/operators/test_subdag_operator.py 
b/tests/operators/test_subdag_operator.py
index 88bb701966..dd5157e891 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -229,9 +229,9 @@ class TestSubDagOperator:
             "execution_date": DEFAULT_DATE,
         }
 
+        subdag_task.pre_execute(context=context)
+        subdag_task.execute(context=context)
         with pytest.raises(AirflowException):
-            subdag_task.pre_execute(context=context)
-            subdag_task.execute(context=context)
             subdag_task.post_execute(context=context)
 
     def test_execute_skip_if_dagrun_success(self):
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index bad16517ea..a90f49926e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -493,12 +493,11 @@ class TestDagRunOperator:
             poll_interval=20,
             states=["success", "failed"],
         )
-        with pytest.raises(AirflowException) as exception:
+        with pytest.raises(AirflowException, match="which is not in"):
             task.execute_complete(
                 context={},
                 event=trigger.serialize(),
             )
-            assert "which is not in" in str(exception)
 
     def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self):
         """Test TriggerDagRunOperator  wait_for_completion dag run in failed 
state."""
@@ -528,7 +527,5 @@ class TestDagRunOperator:
             states=["success", "failed"],
         )
 
-        with pytest.raises(AirflowException) as exception:
+        with pytest.raises(AirflowException, match="failed with failed state"):
             task.execute_complete(context={}, event=trigger.serialize())
-
-            assert "failed with failed state" in str(exception)
diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index 4989aeed8d..b424edb820 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -167,8 +167,8 @@ class TestKerberos:
         mock_subp.stdout = mock.MagicMock(name="stdout", 
**{"readlines.return_value": ["STDOUT"]})
         mock_subp.stderr = mock.MagicMock(name="stderr", 
**{"readlines.return_value": ["STDERR"]})
 
+        caplog.clear()
         with pytest.raises(SystemExit) as ctx:
-            caplog.clear()
             renew_from_kt(principal="test-principal", keytab="keytab")
         assert ctx.value.code == 1
 
@@ -216,8 +216,8 @@ class TestKerberos:
         mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
         mock_subprocess.call.return_value = 1
 
+        caplog.clear()
         with pytest.raises(SystemExit) as ctx:
-            caplog.clear()
             renew_from_kt(principal="test-principal", keytab="keytab")
         assert ctx.value.code == 1
 
@@ -264,9 +264,9 @@ class TestKerberos:
         ]
 
     def test_run_without_keytab(self, caplog):
-        with pytest.raises(SystemExit) as ctx:
-            with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
-                caplog.clear()
+        with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
+            caplog.clear()
+            with pytest.raises(SystemExit) as ctx:
                 kerberos.run(principal="test-principal", keytab=None)
         assert ctx.value.code == 0
         assert caplog.messages == ["Keytab renewer not starting, no keytab 
configured"]
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index 8d63a88486..557e4cf00d 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -237,15 +237,15 @@ class TestExternalTaskSensor:
     def test_external_task_group_not_exists_without_check_existence(self):
         self.add_time_sensor()
         self.add_dummy_task_group()
+        op = ExternalTaskSensor(
+            task_id="test_external_task_sensor_check",
+            external_dag_id=TEST_DAG_ID,
+            external_task_group_id="fake-task-group",
+            timeout=0.001,
+            dag=self.dag,
+            poke_interval=0.1,
+        )
         with pytest.raises(AirflowException, match="Sensor has timed out"):
-            op = ExternalTaskSensor(
-                task_id="test_external_task_sensor_check",
-                external_dag_id=TEST_DAG_ID,
-                external_task_group_id="fake-task-group",
-                timeout=0.001,
-                dag=self.dag,
-                poke_interval=0.1,
-            )
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
     def test_external_task_group_sensor_success(self):
@@ -320,9 +320,9 @@ class TestExternalTaskSensor:
             dag=self.dag,
         )
         error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in 
DAG {TEST_DAG_ID} failed\."
-        with pytest.raises(AirflowException, match=error_message):
-            with caplog.at_level(logging.INFO, logger=op.log.name):
-                caplog.clear()
+        with caplog.at_level(logging.INFO, logger=op.log.name):
+            caplog.clear()
+            with pytest.raises(AirflowException, match=error_message):
                 op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
         assert (
             f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on 
{DEFAULT_DATE.isoformat()} ... "
@@ -423,9 +423,9 @@ class TestExternalTaskSensor:
             rf"Some of the external tasks \['{TEST_TASK_ID}'\, 
\'{TEST_TASK_ID_ALTERNATE}\'] "
             rf"in DAG {TEST_DAG_ID} failed\."
         )
-        with pytest.raises(AirflowException, match=error_message):
-            with caplog.at_level(logging.INFO, logger=op.log.name):
-                caplog.clear()
+        with caplog.at_level(logging.INFO, logger=op.log.name):
+            caplog.clear()
+            with pytest.raises(AirflowException, match=error_message):
                 op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
         assert (
             f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] "
@@ -725,8 +725,8 @@ exit 0
             allowed_states=["success"],
             dag=self.dag,
         )
+        op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
         with pytest.raises(ValueError):
-            op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
             op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
     def test_catch_duplicate_task_ids_with_multiple_xcom_args(self):
@@ -746,8 +746,8 @@ exit 0
             allowed_states=["success"],
             dag=self.dag,
         )
+        op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
         with pytest.raises(ValueError):
-            op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
             op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
     def test_catch_invalid_allowed_states(self):
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 9158e801ca..3c425a1b43 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -204,12 +204,12 @@ class TestFileSensor:
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
         shutil.rmtree(temp_dir)
 
-    def test_subdirectory_empty(self):
-        temp_dir = tempfile.mkdtemp()
-        tempfile.mkdtemp(dir=temp_dir)
+    def test_subdirectory_empty(self, tmp_path):
+        (tmp_path / "subdir").mkdir()
+
         task = FileSensor(
             task_id="test",
-            filepath=temp_dir,
+            filepath=tmp_path.as_posix(),
             fs_conn_id="fs_default",
             dag=self.dag,
             timeout=0,
@@ -219,7 +219,6 @@ class TestFileSensor:
 
         with pytest.raises(AirflowSensorTimeout):
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
-            shutil.rmtree(temp_dir)
 
     def test_task_defer(self):
         task = FileSensor(
diff --git a/tests/serialization/test_serde.py 
b/tests/serialization/test_serde.py
index 6091e30afb..ed9e1d1356 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -178,12 +178,12 @@ class TestSerDe:
         e = serialize(i)
         assert i == e
 
+        i = {CLASSNAME: "cannot"}
         with pytest.raises(AttributeError, match="^reserved"):
-            i = {CLASSNAME: "cannot"}
             serialize(i)
 
+        i = {SCHEMA_ID: "cannot"}
         with pytest.raises(AttributeError, match="^reserved"):
-            i = {SCHEMA_ID: "cannot"}
             serialize(i)
 
     def test_ser_namedtuple(self):
@@ -195,8 +195,8 @@ class TestSerDe:
         assert i == e
 
     def test_no_serializer(self):
+        i = Exception
         with pytest.raises(TypeError, match="^cannot serialize"):
-            i = Exception
             serialize(i)
 
     def test_ser_registered(self):
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 1136a106cb..8ee0877dc3 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -155,8 +155,8 @@ class TestSqlAlchemyUtils:
             guard.commit()
 
             # Check the expected_commit is reset
-            with pytest.raises(RuntimeError):
-                self.session.execute(text("SELECT 1"))
+            self.session.execute(text("SELECT 1"))
+            with pytest.raises(RuntimeError, match="UNEXPECTED COMMIT"):
                 self.session.commit()
 
     def test_prohibit_commit_specific_session_only(self):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 5bd7577a56..359980ec09 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -636,34 +636,32 @@ def test_duplicate_group_id():
 
     execution_date = pendulum.parse("20200101")
 
-    with pytest.raises(DuplicateTaskIdFound, match=r".* 'task1' .*"):
-        with DAG("test_duplicate_group_id", start_date=execution_date):
-            _ = EmptyOperator(task_id="task1")
-            with TaskGroup("task1"):
-                pass
+    with DAG("test_duplicate_group_id", start_date=execution_date):
+        _ = EmptyOperator(task_id="task1")
+        with pytest.raises(DuplicateTaskIdFound, match=r".* 'task1' .*"), 
TaskGroup("task1"):
+            pass
 
-    with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
-        with DAG("test_duplicate_group_id", start_date=execution_date):
-            _ = EmptyOperator(task_id="task1")
-            with TaskGroup("group1", prefix_group_id=False):
-                with TaskGroup("group1"):
-                    pass
+    with DAG("test_duplicate_group_id", start_date=execution_date):
+        _ = EmptyOperator(task_id="task1")
+        with TaskGroup("group1", prefix_group_id=False):
+            with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"), 
TaskGroup("group1"):
+                pass
 
-    with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
-        with DAG("test_duplicate_group_id", start_date=execution_date):
-            with TaskGroup("group1", prefix_group_id=False):
+    with DAG("test_duplicate_group_id", start_date=execution_date):
+        with TaskGroup("group1", prefix_group_id=False):
+            with pytest.raises(DuplicateTaskIdFound, match=r".* 'group1' .*"):
                 _ = EmptyOperator(task_id="group1")
 
-    with pytest.raises(DuplicateTaskIdFound, match=r".* 
'group1.downstream_join_id' .*"):
-        with DAG("test_duplicate_group_id", start_date=execution_date):
-            _ = EmptyOperator(task_id="task1")
-            with TaskGroup("group1"):
+    with DAG("test_duplicate_group_id", start_date=execution_date):
+        _ = EmptyOperator(task_id="task1")
+        with TaskGroup("group1"):
+            with pytest.raises(DuplicateTaskIdFound, match=r".* 
'group1.downstream_join_id' .*"):
                 _ = EmptyOperator(task_id="downstream_join_id")
 
-    with pytest.raises(DuplicateTaskIdFound, match=r".* 
'group1.upstream_join_id' .*"):
-        with DAG("test_duplicate_group_id", start_date=execution_date):
-            _ = EmptyOperator(task_id="task1")
-            with TaskGroup("group1"):
+    with DAG("test_duplicate_group_id", start_date=execution_date):
+        _ = EmptyOperator(task_id="task1")
+        with TaskGroup("group1"):
+            with pytest.raises(DuplicateTaskIdFound, match=r".* 
'group1.upstream_join_id' .*"):
                 _ = EmptyOperator(task_id="upstream_join_id")
 
 
diff --git a/tests/www/test_app.py b/tests/www/test_app.py
index 571aca79cc..1e7bd67c9a 100644
--- a/tests/www/test_app.py
+++ b/tests/www/test_app.py
@@ -18,8 +18,6 @@
 from __future__ import annotations
 
 import hashlib
-import re
-import runpy
 import sys
 from datetime import timedelta
 from unittest import mock
@@ -89,24 +87,9 @@ class TestApp:
         assert b"success" == response.get_data()
         assert response.status_code == 200
 
-    @pytest.mark.parametrize(
-        "base_url, expected_exception",
-        [
-            ("http://localhost:8080/internal-client";, None),
-            (
-                "http://localhost:8080/internal-client/";,
-                AirflowConfigException("webserver.base_url conf cannot have a 
trailing slash."),
-            ),
-        ],
-    )
     @dont_initialize_flask_app_submodules
-    def test_should_respect_base_url_ignore_proxy_headers(self, base_url, 
expected_exception):
-        with conf_vars({("webserver", "base_url"): base_url}):
-            if expected_exception:
-                with pytest.raises(expected_exception.__class__, 
match=re.escape(str(expected_exception))):
-                    app = application.cached_app(testing=True)
-                    app.url_map.add(Rule("/debug", endpoint="debug"))
-                return
+    def test_should_respect_base_url_ignore_proxy_headers(self):
+        with conf_vars({("webserver", "base_url"): 
"http://localhost:8080/internal-client"}):
             app = application.cached_app(testing=True)
             app.url_map.add(Rule("/debug", endpoint="debug"))
 
@@ -140,16 +123,14 @@ class TestApp:
         assert b"success" == response.get_data()
         assert response.status_code == 200
 
-    @pytest.mark.parametrize(
-        "base_url, expected_exception",
-        [
-            ("http://localhost:8080/internal-client";, None),
-            (
-                "http://localhost:8080/internal-client/";,
-                AirflowConfigException("webserver.base_url conf cannot have a 
trailing slash."),
-            ),
-        ],
-    )
+    @dont_initialize_flask_app_submodules
+    def test_base_url_contains_trailing_slash(self):
+        with conf_vars({("webserver", "base_url"): 
"http://localhost:8080/internal-client/"}):
+            with pytest.raises(
+                AirflowConfigException, match="webserver.base_url conf cannot 
have a trailing slash"
+            ):
+                application.cached_app(testing=True)
+
     @conf_vars(
         {
             ("webserver", "enable_proxy_fix"): "True",
@@ -161,15 +142,8 @@ class TestApp:
         }
     )
     @dont_initialize_flask_app_submodules
-    def 
test_should_respect_base_url_when_proxy_fix_and_base_url_is_set_up_but_headers_missing(
-        self, base_url, expected_exception
-    ):
-        with conf_vars({("webserver", "base_url"): base_url}):
-            if expected_exception:
-                with pytest.raises(expected_exception.__class__, 
match=re.escape(str(expected_exception))):
-                    app = application.cached_app(testing=True)
-                    app.url_map.add(Rule("/debug", endpoint="debug"))
-                return
+    def 
test_should_respect_base_url_when_proxy_fix_and_base_url_is_set_up_but_headers_missing(self):
+        with conf_vars({("webserver", "base_url"): 
"http://localhost:8080/internal-client"}):
             app = application.cached_app(testing=True)
             app.url_map.add(Rule("/debug", endpoint="debug"))
 
@@ -262,27 +236,28 @@ class TestApp:
         assert app.config["SESSION_COOKIE_SAMESITE"] == "Lax"
 
     @pytest.mark.parametrize(
-        "hash_method, result, exception",
+        "hash_method, result",
         [
-            ("sha512", hashlib.sha512, None),
-            ("sha384", hashlib.sha384, None),
-            ("sha256", hashlib.sha256, None),
-            ("sha224", hashlib.sha224, None),
-            ("sha1", hashlib.sha1, None),
-            ("md5", hashlib.md5, None),
-            (None, hashlib.md5, None),
-            ("invalid", None, AirflowConfigException),
+            pytest.param("sha512", hashlib.sha512, id="sha512"),
+            pytest.param("sha384", hashlib.sha384, id="sha384"),
+            pytest.param("sha256", hashlib.sha256, id="sha256"),
+            pytest.param("sha224", hashlib.sha224, id="sha224"),
+            pytest.param("sha1", hashlib.sha1, id="sha1"),
+            pytest.param("md5", hashlib.md5, id="md5"),
+            pytest.param(None, hashlib.md5, id="default"),
         ],
     )
-    @dont_initialize_flask_app_submodules
-    def test_should_respect_caching_hash_method(self, hash_method, result, 
exception):
+    
@dont_initialize_flask_app_submodules(skip_all_except=["init_auth_manager"])
+    def test_should_respect_caching_hash_method(self, hash_method, result):
         with conf_vars({("webserver", "caching_hash_method"): hash_method}):
-            if exception:
-                with pytest.raises(expected_exception=exception):
-                    app = application.cached_app(testing=True)
-            else:
-                app = application.cached_app(testing=True)
-                assert next(iter(app.extensions["cache"])).cache._hash_method 
== result
+            app = application.cached_app(testing=True)
+            assert next(iter(app.extensions["cache"])).cache._hash_method == 
result
+
+    @dont_initialize_flask_app_submodules
+    def test_should_respect_caching_hash_method_invalid(self):
+        with conf_vars({("webserver", "caching_hash_method"): "invalid"}):
+            with pytest.raises(expected_exception=AirflowConfigException):
+                application.cached_app(testing=True)
 
 
 class TestFlaskCli:
@@ -290,11 +265,12 @@ class TestFlaskCli:
     def test_flask_cli_should_display_routes(self, capsys):
         with mock.patch.dict("os.environ", 
FLASK_APP="airflow.www.app:cached_app"), mock.patch.object(
             sys, "argv", ["flask", "routes"]
-        ), pytest.raises(SystemExit):
-            from flask import __main__
-
-            # We are not using run_module because of 
https://github.com/pytest-dev/pytest/issues/9007
-            runpy.run_path(__main__.__file__, run_name="main")
+        ):
+            # Import from flask.__main__ with a combination of mocking With 
mocking sys.argv
+            # will invoke ``flask routes`` command.
+            with pytest.raises(SystemExit) as ex_ctx:
+                from flask import __main__  # noqa: F401
+            assert ex_ctx.value.code == 0
 
         output = capsys.readouterr()
         assert "/login/" in output.out

Reply via email to