This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9bfa389e100 Enable pt011 rule 2 (#55749)
9bfa389e100 is described below
commit 9bfa389e10087c4caeffdd0581af109457ad05e8
Author: Xch1 <[email protected]>
AuthorDate: Fri Sep 19 18:33:17 2025 +0800
Enable pt011 rule 2 (#55749)
---
.../tests/unit/standard/decorators/test_python.py | 11 ++--
.../tests/unit/standard/operators/test_hitl.py | 6 +-
.../tests/unit/standard/operators/test_python.py | 10 ++--
.../standard/sensors/test_external_task_sensor.py | 67 +++++++++++++---------
.../trino/tests/unit/trino/assets/test_trino.py | 9 ++-
5 files changed, 58 insertions(+), 45 deletions(-)
diff --git a/providers/standard/tests/unit/standard/decorators/test_python.py
b/providers/standard/tests/unit/standard/decorators/test_python.py
index 80b7e4f2b56..aee460c5866 100644
--- a/providers/standard/tests/unit/standard/decorators/test_python.py
+++ b/providers/standard/tests/unit/standard/decorators/test_python.py
@@ -680,13 +680,11 @@ def test_mapped_decorator_shadow_context() -> None:
def print_info(message: str, run_id: str = "") -> None:
print(f"{run_id}: {message}")
- with pytest.raises(ValueError) as ctx:
+ with pytest.raises(ValueError, match=r"cannot call partial\(\) on task
context variable 'run_id'"):
print_info.partial(run_id="hi")
- assert str(ctx.value) == "cannot call partial() on task context variable
'run_id'"
- with pytest.raises(ValueError) as ctx:
+ with pytest.raises(ValueError, match=r"cannot call expand\(\) on task
context variable 'run_id'"):
print_info.expand(run_id=["hi", "there"])
- assert str(ctx.value) == "cannot call expand() on task context variable
'run_id'"
def test_mapped_decorator_wrong_argument() -> None:
@@ -702,9 +700,10 @@ def test_mapped_decorator_wrong_argument() -> None:
print_info.expand(wrong_name=["hi", "there"])
assert str(ct.value) == "expand() got an unexpected keyword argument
'wrong_name'"
- with pytest.raises(ValueError) as cv:
+ with pytest.raises(
+ ValueError, match=r"expand\(\) got an unexpected type 'str' for
keyword argument 'message'"
+ ):
print_info.expand(message="hi")
- assert str(cv.value) == "expand() got an unexpected type 'str' for keyword
argument 'message'"
def test_mapped_decorator():
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index c7c03df683e..05f4fc4236d 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -316,7 +316,7 @@ class TestHITLOperator:
params={"input": 1},
)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="not exists"):
hitl_op.execute_complete(
context={},
event={
@@ -335,7 +335,7 @@ class TestHITLOperator:
params={"input": 1},
)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="no such key"):
hitl_op.execute_complete(
context={},
event={
@@ -442,7 +442,7 @@ class TestHITLOperator:
class TestApprovalOperator:
def test_init_with_options(self) -> None:
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="Passing options to
ApprovalOperator is not allowed."):
ApprovalOperator(
task_id="hitl_test",
subject="This is subject",
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py
b/providers/standard/tests/unit/standard/operators/test_python.py
index f025d99a852..197a1b31414 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -985,9 +985,10 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
assert task.execute_callable() is False
def test_lambda(self):
- with pytest.raises(ValueError) as info:
+ with pytest.raises(
+ ValueError, match="PythonVirtualenvOperator only supports
functions for python_callable arg"
+ ):
PythonVirtualenvOperator(python_callable=lambda x: 4,
task_id=self.task_id)
- assert str(info.value) == "PythonVirtualenvOperator only supports
functions for python_callable arg"
def test_nonimported_as_arg(self):
def f(_):
@@ -1689,13 +1690,10 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
system_site_packages=False,
)
- with pytest.raises(ValueError) as exc_info:
+ with pytest.raises(ValueError, match=rf"Invalid requirement
'{invalid_requirement}'"):
# Consume the generator to trigger parsing
list(op._iter_serializable_context_keys())
- msg = str(exc_info.value)
- assert f"Invalid requirement '{invalid_requirement}'" in msg
-
@mock.patch("airflow.providers.standard.operators.python.PythonVirtualenvOperator._prepare_venv")
@mock.patch(
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._execute_python_callable_in_subprocess"
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index 2c6d230e174..db8c1c01cd0 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -86,6 +86,8 @@ DEV_NULL = "/dev/null"
TASK_ID = "external_task_sensor_check"
EXTERNAL_DAG_ID = "child_dag" # DAG the external task sensor is waiting on
EXTERNAL_TASK_ID = "child_task" # Task the external task sensor is waiting on
+EXTERNAL_ID_AND_IDS_PROVIDE_ERROR = "Only one of `external_task_id` or
`external_task_ids` may be provided to ExternalTaskSensor; use external_task_id
or external_task_ids or external_task_group_id."
+EXTERNAL_IDS_AND_TASK_GROUP_ID_PROVIDE_ERROR = "Only one of
`external_task_group_id` or `external_task_ids` may be provided to
ExternalTaskSensor; use external_task_id or external_task_ids or
external_task_group_id."
@pytest.fixture(autouse=True)
@@ -203,7 +205,10 @@ class TestExternalTaskSensorV2:
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
def test_raise_with_external_task_sensor_task_id_and_task_ids(self):
- with pytest.raises(ValueError) as ctx:
+ with pytest.raises(
+ ValueError,
+ match=EXTERNAL_ID_AND_IDS_PROVIDE_ERROR,
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_task_id_with_task_ids_failed_status",
external_dag_id=TEST_DAG_ID,
@@ -211,14 +216,9 @@ class TestExternalTaskSensorV2:
external_task_ids=TEST_TASK_ID,
dag=self.dag,
)
- assert (
- str(ctx.value) == "Only one of `external_task_id` or
`external_task_ids` may "
- "be provided to ExternalTaskSensor; "
- "use external_task_id or external_task_ids or
external_task_group_id."
- )
def test_raise_with_external_task_sensor_task_group_and_task_id(self):
- with pytest.raises(ValueError) as ctx:
+ with pytest.raises(ValueError,
match=EXTERNAL_IDS_AND_TASK_GROUP_ID_PROVIDE_ERROR):
ExternalTaskSensor(
task_id="test_external_task_sensor_task_group_with_task_id_failed_status",
external_dag_id=TEST_DAG_ID,
@@ -226,14 +226,9 @@ class TestExternalTaskSensorV2:
external_task_group_id=TEST_TASK_GROUP_ID,
dag=self.dag,
)
- assert (
- str(ctx.value) == "Only one of `external_task_group_id` or
`external_task_ids` may "
- "be provided to ExternalTaskSensor; "
- "use external_task_id or external_task_ids or
external_task_group_id."
- )
def test_raise_with_external_task_sensor_task_group_and_task_ids(self):
- with pytest.raises(ValueError) as ctx:
+ with pytest.raises(ValueError,
match=EXTERNAL_IDS_AND_TASK_GROUP_ID_PROVIDE_ERROR):
ExternalTaskSensor(
task_id="test_external_task_sensor_task_group_with_task_ids_failed_status",
external_dag_id=TEST_DAG_ID,
@@ -241,11 +236,6 @@ class TestExternalTaskSensorV2:
external_task_group_id=TEST_TASK_GROUP_ID,
dag=self.dag,
)
- assert (
- str(ctx.value) == "Only one of `external_task_group_id` or
`external_task_ids` may "
- "be provided to ExternalTaskSensor; "
- "use external_task_id or external_task_ids or
external_task_group_id."
- )
# by default i.e. check_existence=False, if task_group doesn't exist, the
sensor will run till timeout,
# this behaviour is similar to external_task_id doesn't exists
@@ -304,7 +294,10 @@ class TestExternalTaskSensorV2:
)
def test_external_task_sensor_wrong_failed_states(self):
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match="Valid values for `allowed_states`, `skipped_states` and
`failed_states` when `external_task_id` or `external_task_ids` or
`external_task_group_id` is not `None`",
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -666,7 +659,10 @@ exit 0
def test_external_task_sensor_error_delta_and_fn(self):
self.add_time_sensor()
# Test that providing execution_delta and a function raises an error
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match="Only one of `execution_delta` or `execution_date_fn` may be
provided to ExternalTaskSensor; not both.",
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_check_delta",
external_dag_id=TEST_DAG_ID,
@@ -680,7 +676,10 @@ exit 0
def test_external_task_sensor_error_task_id_and_task_ids(self):
self.add_time_sensor()
# Test that providing execution_delta and a function raises an error
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match=EXTERNAL_ID_AND_IDS_PROVIDE_ERROR,
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_task_id_and_task_ids",
external_dag_id=TEST_DAG_ID,
@@ -711,7 +710,7 @@ exit 0
allowed_states=["success"],
dag=self.dag,
)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="Duplicate task_ids"):
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_catch_duplicate_task_ids_with_xcom_arg(self):
@@ -731,7 +730,7 @@ exit 0
dag=self.dag,
)
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="Duplicate task_ids"):
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_catch_duplicate_task_ids_with_multiple_xcom_args(self):
@@ -752,11 +751,14 @@ exit 0
dag=self.dag,
)
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="Duplicate task_ids"):
op2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_catch_invalid_allowed_states(self):
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match="Valid values for `allowed_states`, `skipped_states` and
`failed_states`",
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_check_1",
external_dag_id=TEST_DAG_ID,
@@ -765,7 +767,10 @@ exit 0
dag=self.dag,
)
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match="Valid values for `allowed_states`, `skipped_states` and
`failed_states`",
+ ):
ExternalTaskSensor(
task_id="test_external_task_sensor_check_2",
external_dag_id=TEST_DAG_ID,
@@ -1185,7 +1190,10 @@ class TestExternalTaskSensorV3:
def test_external_task_sensor_invalid_combination(self, dag_maker):
"""Test that the sensor raises an error with invalid parameter
combinations."""
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match=EXTERNAL_ID_AND_IDS_PROVIDE_ERROR,
+ ):
with dag_maker("test_external_task_sensor_invalid_combination"):
ExternalTaskSensor(
task_id="test_external_task_sensor_check",
@@ -1195,7 +1203,10 @@ class TestExternalTaskSensorV3:
)
def test_external_task_sensor_invalid_state(self, dag_maker):
- with pytest.raises(ValueError):
+ with pytest.raises(
+ ValueError,
+ match="Valid values for `allowed_states`, `skipped_states` and
`failed_states` when `external_task_id` or `external_task_ids` or
`external_task_group_id` is not `None",
+ ):
with dag_maker("test_external_task_sensor_invalid_state"):
ExternalTaskSensor(
task_id="test_external_task_sensor_check",
diff --git a/providers/trino/tests/unit/trino/assets/test_trino.py
b/providers/trino/tests/unit/trino/assets/test_trino.py
index 4ebea16d9fc..575ccca6d74 100644
--- a/providers/trino/tests/unit/trino/assets/test_trino.py
+++ b/providers/trino/tests/unit/trino/assets/test_trino.py
@@ -51,11 +51,16 @@ def test_sanitize_uri_pass(original: str, normalized: str)
-> None:
pytest.param("trino://", id="blank"),
pytest.param("trino:///catalog/schema/table", id="no-host"),
pytest.param("trino://example.com/catalog/table",
id="missing-component"),
- pytest.param("trino://example.com:abcd/catalog/schema/table",
id="non-port"),
pytest.param("trino://example.com/catalog/schema/table/column",
id="extra-component"),
],
)
def test_sanitize_uri_fail(value: str) -> None:
uri_i = urllib.parse.urlsplit(value)
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError, match="URI format trino:// must contain"):
+ sanitize_uri(uri_i)
+
+
+def test_sanitize_uri_fail_non_port() -> None:
+ uri_i =
urllib.parse.urlsplit("trino://example.com:abcd/catalog/schema/table")
+ with pytest.raises(ValueError, match="Port could not be cast to integer
value as 'abcd'"):
sanitize_uri(uri_i)