This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ab2851bc745 Return a 422 when the database rejects an API payload
(#66888)
ab2851bc745 is described below
commit ab2851bc7452f34caad35de6b68c082725b53225
Author: Stefan Wang <[email protected]>
AuthorDate: Fri Jun 12 14:46:29 2026 -0700
Return a 422 when the database rejects an API payload (#66888)
* Translate DataError to 413/422 instead of generic 500
Triggering a DAG run with an oversized 'conf' payload (and other
DB-rejected writes across the API surface) currently produces a generic
500. The SQL error surfaces deep in SQLAlchemy as
(1406, "Data too long for column 'conf' at row 1") on MySQL, the
caller has no signal that payload size was the cause, and every write
endpoint that touches a length-capped column has the same shape today
(Connection.extra, Variable.val, XCom.value, TaskInstance.note, HITL
fields, etc).
Add a single FastAPI exception handler for sqlalchemy.exc.DataError on
both the public REST API and the execution API. 'Data too long' /
'too large' / 'too big' errors map to 413 Content Too Large; other
DataErrors (out-of-range, numeric overflow) map to 422. The response
body carries the original DB error and an actionable hint pointing at
either reducing the payload or widening the column type on MySQL.
Every existing and future write endpoint inherits the translation
automatically. Postgres deployments never hit it (JSONB has no length
cap); MySQL deployments get a clear 4xx + remediation hint instead
of a generic 500.
Closes #66779
Signed-off-by: 1fanwang <[email protected]>
* Rename newsfragment to PR number
Signed-off-by: 1fanwang <[email protected]>
* Fix mypy errors on execution_api handler registration + test indexing
Pass DataError directly to add_exception_handler instead of via the
BaseErrorHandler.exception_cls attribute (typed as instance T, not
type[T]) so the call type-checks against Starlette's expected
type[Exception]. The variance issue between Callable[Request, DataError]
and Callable[Request, Exception] is silenced with a type-ignore matching
the existing pattern used in the core_api ERROR_HANDLERS loop.
In the new TestDataErrorHandler tests, extract HTTPException.detail
into a typed dict before subscripting so mypy stops inferring it as str.
Signed-off-by: 1fanwang <[email protected]>
* Simplify DataError handling per review
Drop the 413/'too long' marker special-case and always return 422 to
match the rest of the app. Add `statement` to the response detail so
the shape mirrors the unique-constraint handler. Drop the manual
handler block in the execution API since `init_error_handlers` already
loops over `ERROR_HANDLERS` for both apps. Rename `_DataErrorHandler`
to `DataErrorHandler`. Remove the newsfragment.
* Hide DataError API response internals behind expose_stacktrace
The DataError handler now mirrors _UniqueConstraintErrorHandler: it logs
the statement and underlying database error under a lookup id and only
echoes them back to the caller when [api] expose_stacktrace is set. By
default the response carries an opaque message plus the id, so column
names and rejected values are no longer leaked to API clients.
Tests cover both expose_stacktrace False (internals hidden) and True
(internals exposed) across the dialect error shapes.
* Stop task-instance execution routes from masking DataError
ti_run and ti_update_state wrap their DB writes in catches broader than
DataError: SQLAlchemyError becomes an opaque 500, and a bare Exception in
ti_update_state silently marks the task FAILED and returns 204. Those fire
before the app-level DataErrorHandler can translate the rejection, so an
oversized field (note, rendered_map_index, next_kwargs, ...) either returns
a generic 500 or leaves the worker believing the update succeeded while the
server marked the task FAILED.
Re-raise DataError ahead of each broad catch so it reaches the handler and
the caller gets the actionable 422. The fallbacks for other unexpected
errors are unchanged.
* Tighten DataError docstring and re-raise comments
Drop the per-dialect enumeration in the handler docstring and collapse the
multi-line re-raise comments to one line each.
* Test DataError handling in the task-instance execution routes
Add MySQL/Postgres-only route tests (SQLite ignores varchar limits): an
oversized field the database rejects returns 422 from ti_run and
ti_update_state, and the deferred-state path no longer silently marks the
TI FAILED.
---------
Signed-off-by: 1fanwang <[email protected]>
---
.../src/airflow/api_fastapi/common/exceptions.py | 54 +++++++++-
.../execution_api/routes/task_instances.py | 11 ++-
.../unit/api_fastapi/common/test_exceptions.py | 110 ++++++++++++++++++++-
.../versions/head/test_task_instances.py | 79 +++++++++++++++
4 files changed, 249 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/exceptions.py
b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
index 12d2486253c..efe775e544e 100644
--- a/airflow-core/src/airflow/api_fastapi/common/exceptions.py
+++ b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
@@ -24,7 +24,7 @@ from enum import Enum
from typing import Generic, TypeVar
from fastapi import HTTPException, Request, status
-from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import DataError, IntegrityError
from airflow.configuration import conf
from airflow.exceptions import DeserializationError
@@ -108,6 +108,52 @@ class
_UniqueConstraintErrorHandler(BaseErrorHandler[IntegrityError]):
return False
+class DataErrorHandler(BaseErrorHandler[DataError]):
+ """
+ Translate ``sqlalchemy.exc.DataError`` into a 422.
+
+ The database rejected a value that passed Pydantic validation (too long,
out
+ of range, or the wrong type for its column), so it is a client error, not a
+ 500. The statement and underlying DB error are logged under a lookup id and
+ returned to the caller only when ``[api] expose_stacktrace`` is set,
mirroring
+ ``_UniqueConstraintErrorHandler``.
+ """
+
+ def __init__(self):
+ super().__init__(DataError)
+
+ def exception_handler(self, request: Request, exc: DataError):
+ """Handle DataError exception."""
+ exception_id = get_random_string()
+ stacktrace = ""
+ for tb in traceback.format_tb(exc.__traceback__):
+ stacktrace += tb
+
+ log_message = f"Error with id {exception_id}, statement:
{exc.statement}\n{stacktrace}"
+ log.error(log_message)
+ if conf.get("api", "expose_stacktrace") == "True":
+ message = log_message
+ statement = str(exc.statement)
+ orig_error = str(exc.orig)
+ else:
+ message = (
+ "Serious error when handling your request. Check logs for more
details - "
+ f"you will find it in api server when you look for ID
{exception_id}"
+ )
+ statement = "hidden"
+ orig_error = "hidden"
+
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail={
+ "reason": "Value rejected by database",
+ "statement": statement,
+ "orig_error": orig_error,
+ "message": message,
+ },
+ )
+
+
class DagErrorHandler(BaseErrorHandler[DeserializationError]):
"""Handler for Dag related errors."""
@@ -122,4 +168,8 @@ class
DagErrorHandler(BaseErrorHandler[DeserializationError]):
)
-ERROR_HANDLERS: list[BaseErrorHandler] = [_UniqueConstraintErrorHandler(),
DagErrorHandler()]
+ERROR_HANDLERS: list[BaseErrorHandler] = [
+ _UniqueConstraintErrorHandler(),
+ DataErrorHandler(),
+ DagErrorHandler(),
+]
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 8a91614e09a..7a53a9ddf59 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -35,7 +35,7 @@ from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapProp
from pydantic import JsonValue
from sqlalchemy import and_, func, or_, tuple_, update
from sqlalchemy.engine import CursorResult
-from sqlalchemy.exc import NoResultFound, SQLAlchemyError
+from sqlalchemy.exc import DataError, NoResultFound, SQLAlchemyError
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import select
from structlog.contextvars import bind_contextvars
@@ -314,6 +314,9 @@ def ti_run(
context.next_method = ti.next_method
context.next_kwargs = ti.next_kwargs
context.start_date = ti.start_date
+ except DataError:
+ # Let the app-level DataErrorHandler return a 422 (not the opaque 500
below).
+ raise
except SQLAlchemyError:
log.exception("Error marking Task Instance state as running")
raise HTTPException(
@@ -438,6 +441,9 @@ def ti_update_state(
dag_id=dag_id,
dag_bag=dag_bag,
)
+ except DataError:
+ # Let DataErrorHandler return a 422 instead of silently marking the TI
FAILED below.
+ raise
except Exception:
# Set a task to failed in case any unexpected exception happened
during task state update
log.exception(
@@ -473,6 +479,9 @@ def ti_update_state(
extra=json.dumps({"host_name": hostname}) if hostname else
None,
)
)
+ except DataError:
+ # Let DataErrorHandler return a 422 (not the opaque 500 below).
+ raise
except SQLAlchemyError as e:
log.error("Error updating Task Instance state", error=str(e))
raise HTTPException(
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
index fb97f0ac323..7f470e2f019 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
@@ -21,13 +21,16 @@ from typing import TYPE_CHECKING
from unittest.mock import Mock, patch
import pytest
-from fastapi import HTTPException, status
+from fastapi import FastAPI, HTTPException, status
+from fastapi.testclient import TestClient
from sqlalchemy import select, update
-from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import DataError, IntegrityError
from sqlalchemy.orm import Session
from airflow.api_fastapi.common.exceptions import (
+ ERROR_HANDLERS,
DagErrorHandler,
+ DataErrorHandler,
_DatabaseDialect,
_UniqueConstraintErrorHandler,
)
@@ -392,6 +395,109 @@ class TestUniqueConstraintErrorHandler:
assert exeinfo_response_error.value.detail == expected_exception.detail
+class TestDataErrorHandler:
+ handler = DataErrorHandler()
+
+ _STATEMENT = "INSERT INTO dag_run (conf) VALUES (?)"
+
+ # One representative ``orig`` message per dialect / rejection class. The
handler is
+ # dialect-agnostic (it translates every DataError the same way), so the
value only
+ # needs to round-trip through ``orig_error`` when stacktrace exposure is
on.
+ _ORIG_MSGS = [
+ pytest.param(
+ "(1406, \"Data too long for column 'conf' at row 1\")",
+ id="mysql-1406-data-too-long",
+ ),
+ pytest.param(
+ "value too long for type character varying(250)",
+ id="postgres-value-too-long",
+ ),
+ pytest.param(
+ "string or blob too big",
+ id="sqlite-blob-too-big",
+ ),
+ pytest.param(
+ "(1264, \"Out of range value for column 'slots' at row 1\")",
+ id="mysql-1264-out-of-range",
+ ),
+ pytest.param(
+ "numeric field overflow",
+ id="postgres-numeric-field-overflow",
+ ),
+ pytest.param(
+ "(1366, \"Incorrect integer value: 'abc' for column 'slots' at row
1\")",
+ id="mysql-1366-incorrect-value",
+ ),
+ pytest.param(
+ "invalid input syntax for type integer",
+ id="postgres-invalid-input-syntax",
+ ),
+ ]
+
+ @staticmethod
+ def _make_data_error(orig_msg: str) -> DataError:
+ return DataError(
+ statement=TestDataErrorHandler._STATEMENT,
+ params={},
+ orig=Exception(orig_msg),
+ )
+
+ @pytest.mark.parametrize("orig_msg", _ORIG_MSGS)
+ @patch("airflow.api_fastapi.common.exceptions.get_random_string",
return_value=MOCKED_ID)
+ @conf_vars({("api", "expose_stacktrace"): "False"})
+ def test_data_error_hides_db_internals_without_stacktrace(
+ self,
+ mock_get_random_string,
+ orig_msg: str,
+ ) -> None:
+ exc = self._make_data_error(orig_msg)
+ with pytest.raises(HTTPException) as exc_info:
+ self.handler.exception_handler(Mock(), exc)
+ assert exc_info.value.status_code ==
status.HTTP_422_UNPROCESSABLE_ENTITY
+ assert exc_info.value.detail == {
+ "reason": "Value rejected by database",
+ "statement": "hidden",
+ "orig_error": "hidden",
+ "message": MESSAGE,
+ }
+
+ @pytest.mark.parametrize("orig_msg", _ORIG_MSGS)
+ @patch("airflow.api_fastapi.common.exceptions.get_random_string",
return_value=MOCKED_ID)
+ @conf_vars({("api", "expose_stacktrace"): "True"})
+ def test_data_error_exposes_db_internals_with_stacktrace(
+ self,
+ mock_get_random_string,
+ orig_msg: str,
+ ) -> None:
+ exc = self._make_data_error(orig_msg)
+ with pytest.raises(HTTPException) as exc_info:
+ self.handler.exception_handler(Mock(), exc)
+ assert exc_info.value.status_code ==
status.HTTP_422_UNPROCESSABLE_ENTITY
+ detail = exc_info.value.detail
+ assert isinstance(detail, dict)
+ assert detail["reason"] == "Value rejected by database"
+ assert detail["statement"] == self._STATEMENT
+ assert detail["orig_error"] == orig_msg
+
+ @conf_vars({("api", "expose_stacktrace"): "False"})
+ def test_data_error_dispatched_through_fastapi_app(self) -> None:
+ """End-to-end: a route raising DataError returns 422 via the
registered handler."""
+ app = FastAPI()
+ for h in ERROR_HANDLERS:
+ app.add_exception_handler(h.exception_cls, h.exception_handler)
+
+ @app.post("/test")
+ def trigger_data_error():
+ raise self._make_data_error("(1406, \"Data too long for column
'conf' at row 1\")")
+
+ response = TestClient(app, raise_server_exceptions=False).post("/test")
+ assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
+ detail = response.json()["detail"]
+ assert detail["reason"] == "Value rejected by database"
+ assert detail["statement"] == "hidden"
+ assert detail["orig_error"] == "hidden"
+
+
class TestDagErrorHandler:
@pytest.mark.parametrize(
"cause",
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 4d21bb406f9..9ce0f9dabf5 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1675,6 +1675,85 @@ class TestTIUpdateState:
ti = session.get(TaskInstance, ti.id)
assert ti.state == State.FAILED
+ # DataError handling: an oversized field the DB rejects must surface as
422 (not an opaque 500
+ # or a silent mark-FAILED). Only MySQL/Postgres enforce varchar limits;
SQLite ignores them.
+ @pytest.mark.backend("mysql", "postgres")
+ def test_ti_run_oversized_field_returns_422(self, client, session,
create_task_instance):
+ """ti_run: a DataError on the running-state UPDATE surfaces as 422,
not 500."""
+ ti = create_task_instance(
+ task_id="test_ti_run_dataerror",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ dag_id=str(uuid4()),
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "h" * 1100, # > hostname varchar(1000)
+ "unixname": "u",
+ "pid": 1,
+ "start_date": "2024-09-30T12:00:00Z",
+ },
+ )
+ assert response.status_code == 422
+ assert response.json()["detail"]["reason"] == "Value rejected by
database"
+
+ @pytest.mark.backend("mysql", "postgres")
+ def test_ti_update_state_oversized_field_returns_422(self, client,
session, create_task_instance):
+ """ti_update_state: a DataError on the state UPDATE surfaces as 422,
not 500."""
+ ti = create_task_instance(
+ task_id="test_ti_update_dataerror",
+ state=State.RUNNING,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ dag_id=str(uuid4()),
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "up_for_retry",
+ "end_date": "2024-09-30T12:00:00Z",
+ "rendered_map_index": "x" * 300, # > rendered_map_index
varchar(250)
+ },
+ )
+ assert response.status_code == 422
+ assert response.json()["detail"]["reason"] == "Value rejected by
database"
+
+ @pytest.mark.backend("mysql", "postgres")
+ def test_ti_update_state_dataerror_does_not_silently_fail(self, client,
session, create_task_instance):
+ """A DataError raised while building the state update must not
silently mark the TI FAILED."""
+ ti = create_task_instance(
+ task_id="test_ti_defer_dataerror",
+ state=State.RUNNING,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ dag_id=str(uuid4()),
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "deferred",
+ "classpath": "c" * 1100, # > trigger.classpath varchar(1000)
+ "next_method": "execute_complete",
+ "trigger_kwargs": {},
+ "next_kwargs": {},
+ },
+ )
+ assert response.status_code == 422
+ assert response.json()["detail"]["reason"] == "Value rejected by
database"
+
+ # The bare `except Exception` would have marked this FAILED behind the
worker's back.
+ session.expire_all()
+ assert session.get(TaskInstance, ti.id).state == State.RUNNING
+
def test_ti_update_state_handle_retry(self, client, session,
create_task_instance):
ti = create_task_instance(
task_id="test_ti_update_state_to_retry",