This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ab2851bc745 Return a 422 when the database rejects an API payload 
(#66888)
ab2851bc745 is described below

commit ab2851bc7452f34caad35de6b68c082725b53225
Author: Stefan Wang <[email protected]>
AuthorDate: Fri Jun 12 14:46:29 2026 -0700

    Return a 422 when the database rejects an API payload (#66888)
    
    * Translate DataError to 413/422 instead of generic 500
    
    Triggering a DAG run with an oversized 'conf' payload (and other
    DB-rejected writes across the API surface) currently produces a generic
    500. The SQL error surfaces deep in SQLAlchemy as
    (1406, "Data too long for column 'conf' at row 1") on MySQL, the
    caller has no signal that payload size was the cause, and every write
    endpoint that touches a length-capped column has the same shape today
    (Connection.extra, Variable.val, XCom.value, TaskInstance.note, HITL
    fields, etc).
    
    Add a single FastAPI exception handler for sqlalchemy.exc.DataError on
    both the public REST API and the execution API. 'Data too long' /
    'too large' / 'too big' errors map to 413 Content Too Large; other
    DataErrors (out-of-range, numeric overflow) map to 422. The response
    body carries the original DB error and an actionable hint pointing at
    either reducing the payload or widening the column type on MySQL.
    
    Every existing and future write endpoint inherits the translation
    automatically. Postgres deployments never hit it (JSONB has no length
    cap); MySQL deployments get a clear 4xx + remediation hint instead
    of a generic 500.
    
    Closes #66779
    
    Signed-off-by: 1fanwang <[email protected]>
    
    * Rename newsfragment to PR number
    
    Signed-off-by: 1fanwang <[email protected]>
    
    * Fix mypy errors on execution_api handler registration + test indexing
    
    Pass DataError directly to add_exception_handler instead of via the
    BaseErrorHandler.exception_cls attribute (typed as instance T, not
    type[T]) so the call type-checks against Starlette's expected
    type[Exception]. The variance issue between Callable[Request, DataError]
    and Callable[Request, Exception] is silenced with a type-ignore matching
    the existing pattern used in the core_api ERROR_HANDLERS loop.
    
    In the new TestDataErrorHandler tests, extract HTTPException.detail
    into a typed dict before subscripting so mypy stops inferring it as str.
    
    Signed-off-by: 1fanwang <[email protected]>
    
    * Simplify DataError handling per review
    
    Drop the 413/'too long' marker special-case and always return 422 to
    match the rest of the app. Add `statement` to the response detail so
    the shape mirrors the unique-constraint handler. Drop the manual
    handler block in the execution API since `init_error_handlers` already
    loops over `ERROR_HANDLERS` for both apps. Rename `_DataErrorHandler`
    to `DataErrorHandler`. Remove the newsfragment.
    
    * Hide DataError API response internals behind expose_stacktrace
    
    The DataError handler now mirrors _UniqueConstraintErrorHandler: it logs
    the statement and underlying database error under a lookup id and only
    echoes them back to the caller when [api] expose_stacktrace is set. By
    default the response carries an opaque message plus the id, so column
    names and rejected values are no longer leaked to API clients.
    
    Tests cover both expose_stacktrace False (internals hidden) and True
    (internals exposed) across the dialect error shapes.
    
    * Stop task-instance execution routes from masking DataError
    
    ti_run and ti_update_state wrap their DB writes in catches broader than
    DataError: SQLAlchemyError becomes an opaque 500, and a bare Exception in
    ti_update_state silently marks the task FAILED and returns 204. Those fire
    before the app-level DataErrorHandler can translate the rejection, so an
    oversized field (note, rendered_map_index, next_kwargs, ...) either returns
    a generic 500 or leaves the worker believing the update succeeded while the
    server marked the task FAILED.
    
    Re-raise DataError ahead of each broad catch so it reaches the handler and
    the caller gets the actionable 422. The fallbacks for other unexpected
    errors are unchanged.
    
    * Tighten DataError docstring and re-raise comments
    
    Drop the per-dialect enumeration in the handler docstring and collapse the
    multi-line re-raise comments to one line each.
    
    * Test DataError handling in the task-instance execution routes
    
    Add MySQL/Postgres-only route tests (SQLite ignores varchar limits): an
    oversized field the database rejects returns 422 from ti_run and
    ti_update_state, and the deferred-state path no longer silently marks the
    TI FAILED.
    
    ---------
    
    Signed-off-by: 1fanwang <[email protected]>
---
 .../src/airflow/api_fastapi/common/exceptions.py   |  54 +++++++++-
 .../execution_api/routes/task_instances.py         |  11 ++-
 .../unit/api_fastapi/common/test_exceptions.py     | 110 ++++++++++++++++++++-
 .../versions/head/test_task_instances.py           |  79 +++++++++++++++
 4 files changed, 249 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/exceptions.py 
b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
index 12d2486253c..efe775e544e 100644
--- a/airflow-core/src/airflow/api_fastapi/common/exceptions.py
+++ b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
@@ -24,7 +24,7 @@ from enum import Enum
 from typing import Generic, TypeVar
 
 from fastapi import HTTPException, Request, status
-from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import DataError, IntegrityError
 
 from airflow.configuration import conf
 from airflow.exceptions import DeserializationError
@@ -108,6 +108,52 @@ class 
_UniqueConstraintErrorHandler(BaseErrorHandler[IntegrityError]):
         return False
 
 
+class DataErrorHandler(BaseErrorHandler[DataError]):
+    """
+    Translate ``sqlalchemy.exc.DataError`` into a 422.
+
+    The database rejected a value that passed Pydantic validation (too long, 
out
+    of range, or the wrong type for its column), so it is a client error, not a
+    500. The statement and underlying DB error are logged under a lookup id and
+    returned to the caller only when ``[api] expose_stacktrace`` is set, 
mirroring
+    ``_UniqueConstraintErrorHandler``.
+    """
+
+    def __init__(self):
+        super().__init__(DataError)
+
+    def exception_handler(self, request: Request, exc: DataError):
+        """Handle DataError exception."""
+        exception_id = get_random_string()
+        stacktrace = ""
+        for tb in traceback.format_tb(exc.__traceback__):
+            stacktrace += tb
+
+        log_message = f"Error with id {exception_id}, statement: 
{exc.statement}\n{stacktrace}"
+        log.error(log_message)
+        if conf.get("api", "expose_stacktrace") == "True":
+            message = log_message
+            statement = str(exc.statement)
+            orig_error = str(exc.orig)
+        else:
+            message = (
+                "Serious error when handling your request. Check logs for more 
details - "
+                f"you will find it in api server when you look for ID 
{exception_id}"
+            )
+            statement = "hidden"
+            orig_error = "hidden"
+
+        raise HTTPException(
+            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+            detail={
+                "reason": "Value rejected by database",
+                "statement": statement,
+                "orig_error": orig_error,
+                "message": message,
+            },
+        )
+
+
 class DagErrorHandler(BaseErrorHandler[DeserializationError]):
     """Handler for Dag related errors."""
 
@@ -122,4 +168,8 @@ class 
DagErrorHandler(BaseErrorHandler[DeserializationError]):
         )
 
 
-ERROR_HANDLERS: list[BaseErrorHandler] = [_UniqueConstraintErrorHandler(), 
DagErrorHandler()]
+ERROR_HANDLERS: list[BaseErrorHandler] = [
+    _UniqueConstraintErrorHandler(),
+    DataErrorHandler(),
+    DagErrorHandler(),
+]
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 8a91614e09a..7a53a9ddf59 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -35,7 +35,7 @@ from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapProp
 from pydantic import JsonValue
 from sqlalchemy import and_, func, or_, tuple_, update
 from sqlalchemy.engine import CursorResult
-from sqlalchemy.exc import NoResultFound, SQLAlchemyError
+from sqlalchemy.exc import DataError, NoResultFound, SQLAlchemyError
 from sqlalchemy.orm import joinedload
 from sqlalchemy.sql import select
 from structlog.contextvars import bind_contextvars
@@ -314,6 +314,9 @@ def ti_run(
             context.next_method = ti.next_method
             context.next_kwargs = ti.next_kwargs
             context.start_date = ti.start_date
+    except DataError:
+        # Let the app-level DataErrorHandler return a 422 (not the opaque 500 
below).
+        raise
     except SQLAlchemyError:
         log.exception("Error marking Task Instance state as running")
         raise HTTPException(
@@ -438,6 +441,9 @@ def ti_update_state(
             dag_id=dag_id,
             dag_bag=dag_bag,
         )
+    except DataError:
+        # Let DataErrorHandler return a 422 instead of silently marking the TI 
FAILED below.
+        raise
     except Exception:
         # Set a task to failed in case any unexpected exception happened 
during task state update
         log.exception(
@@ -473,6 +479,9 @@ def ti_update_state(
                 extra=json.dumps({"host_name": hostname}) if hostname else 
None,
             )
         )
+    except DataError:
+        # Let DataErrorHandler return a 422 (not the opaque 500 below).
+        raise
     except SQLAlchemyError as e:
         log.error("Error updating Task Instance state", error=str(e))
         raise HTTPException(
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py 
b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
index fb97f0ac323..7f470e2f019 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
@@ -21,13 +21,16 @@ from typing import TYPE_CHECKING
 from unittest.mock import Mock, patch
 
 import pytest
-from fastapi import HTTPException, status
+from fastapi import FastAPI, HTTPException, status
+from fastapi.testclient import TestClient
 from sqlalchemy import select, update
-from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import DataError, IntegrityError
 from sqlalchemy.orm import Session
 
 from airflow.api_fastapi.common.exceptions import (
+    ERROR_HANDLERS,
     DagErrorHandler,
+    DataErrorHandler,
     _DatabaseDialect,
     _UniqueConstraintErrorHandler,
 )
@@ -392,6 +395,109 @@ class TestUniqueConstraintErrorHandler:
         assert exeinfo_response_error.value.detail == expected_exception.detail
 
 
+class TestDataErrorHandler:
+    handler = DataErrorHandler()
+
+    _STATEMENT = "INSERT INTO dag_run (conf) VALUES (?)"
+
+    # One representative ``orig`` message per dialect / rejection class. The 
handler is
+    # dialect-agnostic (it translates every DataError the same way), so the 
value only
+    # needs to round-trip through ``orig_error`` when stacktrace exposure is 
on.
+    _ORIG_MSGS = [
+        pytest.param(
+            "(1406, \"Data too long for column 'conf' at row 1\")",
+            id="mysql-1406-data-too-long",
+        ),
+        pytest.param(
+            "value too long for type character varying(250)",
+            id="postgres-value-too-long",
+        ),
+        pytest.param(
+            "string or blob too big",
+            id="sqlite-blob-too-big",
+        ),
+        pytest.param(
+            "(1264, \"Out of range value for column 'slots' at row 1\")",
+            id="mysql-1264-out-of-range",
+        ),
+        pytest.param(
+            "numeric field overflow",
+            id="postgres-numeric-field-overflow",
+        ),
+        pytest.param(
+            "(1366, \"Incorrect integer value: 'abc' for column 'slots' at row 
1\")",
+            id="mysql-1366-incorrect-value",
+        ),
+        pytest.param(
+            "invalid input syntax for type integer",
+            id="postgres-invalid-input-syntax",
+        ),
+    ]
+
+    @staticmethod
+    def _make_data_error(orig_msg: str) -> DataError:
+        return DataError(
+            statement=TestDataErrorHandler._STATEMENT,
+            params={},
+            orig=Exception(orig_msg),
+        )
+
+    @pytest.mark.parametrize("orig_msg", _ORIG_MSGS)
+    @patch("airflow.api_fastapi.common.exceptions.get_random_string", 
return_value=MOCKED_ID)
+    @conf_vars({("api", "expose_stacktrace"): "False"})
+    def test_data_error_hides_db_internals_without_stacktrace(
+        self,
+        mock_get_random_string,
+        orig_msg: str,
+    ) -> None:
+        exc = self._make_data_error(orig_msg)
+        with pytest.raises(HTTPException) as exc_info:
+            self.handler.exception_handler(Mock(), exc)
+        assert exc_info.value.status_code == 
status.HTTP_422_UNPROCESSABLE_ENTITY
+        assert exc_info.value.detail == {
+            "reason": "Value rejected by database",
+            "statement": "hidden",
+            "orig_error": "hidden",
+            "message": MESSAGE,
+        }
+
+    @pytest.mark.parametrize("orig_msg", _ORIG_MSGS)
+    @patch("airflow.api_fastapi.common.exceptions.get_random_string", 
return_value=MOCKED_ID)
+    @conf_vars({("api", "expose_stacktrace"): "True"})
+    def test_data_error_exposes_db_internals_with_stacktrace(
+        self,
+        mock_get_random_string,
+        orig_msg: str,
+    ) -> None:
+        exc = self._make_data_error(orig_msg)
+        with pytest.raises(HTTPException) as exc_info:
+            self.handler.exception_handler(Mock(), exc)
+        assert exc_info.value.status_code == 
status.HTTP_422_UNPROCESSABLE_ENTITY
+        detail = exc_info.value.detail
+        assert isinstance(detail, dict)
+        assert detail["reason"] == "Value rejected by database"
+        assert detail["statement"] == self._STATEMENT
+        assert detail["orig_error"] == orig_msg
+
+    @conf_vars({("api", "expose_stacktrace"): "False"})
+    def test_data_error_dispatched_through_fastapi_app(self) -> None:
+        """End-to-end: a route raising DataError returns 422 via the 
registered handler."""
+        app = FastAPI()
+        for h in ERROR_HANDLERS:
+            app.add_exception_handler(h.exception_cls, h.exception_handler)
+
+        @app.post("/test")
+        def trigger_data_error():
+            raise self._make_data_error("(1406, \"Data too long for column 
'conf' at row 1\")")
+
+        response = TestClient(app, raise_server_exceptions=False).post("/test")
+        assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
+        detail = response.json()["detail"]
+        assert detail["reason"] == "Value rejected by database"
+        assert detail["statement"] == "hidden"
+        assert detail["orig_error"] == "hidden"
+
+
 class TestDagErrorHandler:
     @pytest.mark.parametrize(
         "cause",
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 4d21bb406f9..9ce0f9dabf5 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1675,6 +1675,85 @@ class TestTIUpdateState:
         ti = session.get(TaskInstance, ti.id)
         assert ti.state == State.FAILED
 
+    # DataError handling: an oversized field the DB rejects must surface as 
422 (not an opaque 500
+    # or a silent mark-FAILED). Only MySQL/Postgres enforce varchar limits; 
SQLite ignores them.
+    @pytest.mark.backend("mysql", "postgres")
+    def test_ti_run_oversized_field_returns_422(self, client, session, 
create_task_instance):
+        """ti_run: a DataError on the running-state UPDATE surfaces as 422, 
not 500."""
+        ti = create_task_instance(
+            task_id="test_ti_run_dataerror",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            dag_id=str(uuid4()),
+        )
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/run",
+            json={
+                "state": "running",
+                "hostname": "h" * 1100,  # > hostname varchar(1000)
+                "unixname": "u",
+                "pid": 1,
+                "start_date": "2024-09-30T12:00:00Z",
+            },
+        )
+        assert response.status_code == 422
+        assert response.json()["detail"]["reason"] == "Value rejected by 
database"
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_ti_update_state_oversized_field_returns_422(self, client, 
session, create_task_instance):
+        """ti_update_state: a DataError on the state UPDATE surfaces as 422, 
not 500."""
+        ti = create_task_instance(
+            task_id="test_ti_update_dataerror",
+            state=State.RUNNING,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            dag_id=str(uuid4()),
+        )
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": "up_for_retry",
+                "end_date": "2024-09-30T12:00:00Z",
+                "rendered_map_index": "x" * 300,  # > rendered_map_index 
varchar(250)
+            },
+        )
+        assert response.status_code == 422
+        assert response.json()["detail"]["reason"] == "Value rejected by 
database"
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_ti_update_state_dataerror_does_not_silently_fail(self, client, 
session, create_task_instance):
+        """A DataError raised while building the state update must not 
silently mark the TI FAILED."""
+        ti = create_task_instance(
+            task_id="test_ti_defer_dataerror",
+            state=State.RUNNING,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            dag_id=str(uuid4()),
+        )
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": "deferred",
+                "classpath": "c" * 1100,  # > trigger.classpath varchar(1000)
+                "next_method": "execute_complete",
+                "trigger_kwargs": {},
+                "next_kwargs": {},
+            },
+        )
+        assert response.status_code == 422
+        assert response.json()["detail"]["reason"] == "Value rejected by 
database"
+
+        # The bare `except Exception` would have marked this FAILED behind the 
worker's back.
+        session.expire_all()
+        assert session.get(TaskInstance, ti.id).state == State.RUNNING
+
     def test_ti_update_state_handle_retry(self, client, session, 
create_task_instance):
         ti = create_task_instance(
             task_id="test_ti_update_state_to_retry",

Reply via email to