This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 221cdd64056 Fix SQLToolset read-only mode bypass via data-modifying 
CTEs and SELECT INTO (#64173)
221cdd64056 is described below

commit 221cdd640567bc0f8fb9990b1f81258446308182
Author: Elad Kalif <[email protected]>
AuthorDate: Thu Mar 26 03:24:32 2026 +0200

    Fix SQLToolset read-only mode bypass via data-modifying CTEs and SELECT 
INTO (#64173)
    
    * Fix SQLToolset read-only mode bypass via data-modifying CTEs and SELECT 
INTO
    
    * fixes
---
 .../providers/common/ai/utils/sql_validation.py    | 43 +++++++++++++++++
 .../unit/common/ai/utils/test_sql_validation.py    | 54 ++++++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py 
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
index 42d41dcfc7d..3315eff1dcc 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
@@ -39,6 +39,22 @@ DEFAULT_ALLOWED_TYPES: tuple[type[exp.Expr], ...] = (
     exp.Except,
 )
 
+# Denylist: expression types that mutate data or schema when found anywhere in 
the AST.
+# This catches data-modifying CTEs (e.g. WITH del AS (DELETE …) SELECT …),
+# SELECT INTO, and other constructs that bypass top-level type checks.
+# Note: exp.Command is sqlglot's fallback for any syntax it doesn't recognize.
+# Including it makes the denylist fail-closed (safer), but may block legitimate
+# vendor-specific SQL that sqlglot can't parse. Callers who need such syntax 
can
+# provide custom allowed_types to bypass the deep scan entirely.
+_DATA_MODIFYING_NODES: tuple[type[exp.Expr], ...] = (
+    exp.Insert,
+    exp.Update,
+    exp.Delete,
+    exp.Merge,
+    exp.Into,
+    exp.Command,
+)
+
 
 class SQLSafetyError(Exception):
     """Generated SQL failed safety validation."""
@@ -97,4 +113,31 @@ def validate_sql(
                 f"Statement type '{type(stmt).__name__}' is not allowed. 
Allowed types: {allowed_names}"
             )
 
+    # Deep scan: reject data-modifying nodes hidden inside otherwise-allowed 
statements
+    # (e.g. data-modifying CTEs, SELECT INTO). Only applies when using the 
default
+    # read-only allowlist — callers who provide custom allowed_types have 
explicitly
+    # opted into non-read-only operations.
+    if types is DEFAULT_ALLOWED_TYPES:
+        _check_for_data_modifying_nodes(parsed)
+
     return parsed
+
+
+def _check_for_data_modifying_nodes(statements: list[exp.Expr]) -> None:
+    """
+    Walk the full AST of each statement and reject data-modifying expressions.
+
+    This catches bypass vectors like:
+    - Data-modifying CTEs: ``WITH d AS (DELETE FROM t RETURNING *) SELECT * 
FROM d``
+    - SELECT INTO: ``SELECT * INTO new_table FROM t``
+    - INSERT/UPDATE/DELETE hidden inside subqueries or CTEs
+
+    :raises SQLSafetyError: If any data-modifying node is found in the AST.
+    """
+    for stmt in statements:
+        for node in stmt.walk():
+            if isinstance(node, _DATA_MODIFYING_NODES):
+                raise SQLSafetyError(
+                    f"Data-modifying operation '{type(node).__name__}' found 
inside statement. "
+                    f"Only pure read operations are allowed in read-only mode."
+                )
diff --git 
a/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py 
b/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
index 6c6b218cadb..fe27379aebe 100644
--- a/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
+++ b/providers/common/ai/tests/unit/common/ai/utils/test_sql_validation.py
@@ -159,3 +159,57 @@ class TestValidateSQLEdgeCases:
         """Trailing semicolon should not cause multi-statement error."""
         result = validate_sql("SELECT 1;")
         assert len(result) == 1
+
+
+class TestDataModifyingNodeDetection:
+    """Data-modifying operations hidden inside allowed statement types should 
be blocked."""
+
+    def test_cte_with_delete_blocked(self):
+        """DELETE inside a CTE bypasses top-level type check."""
+        with pytest.raises(SQLSafetyError, match="Data-modifying operation 
'Delete'"):
+            validate_sql(
+                "WITH del AS (DELETE FROM users RETURNING *) SELECT * FROM 
del",
+                dialect="postgres",
+            )
+
+    def test_cte_with_insert_blocked(self):
+        """INSERT inside a CTE bypasses top-level type check."""
+        with pytest.raises(SQLSafetyError, match="Data-modifying operation 
'Insert'"):
+            validate_sql(
+                "WITH ins AS (INSERT INTO users(name) VALUES ('x') RETURNING 
*) SELECT * FROM ins",
+                dialect="postgres",
+            )
+
+    def test_cte_with_update_blocked(self):
+        """UPDATE inside a CTE bypasses top-level type check."""
+        with pytest.raises(SQLSafetyError, match="Data-modifying operation 
'Update'"):
+            validate_sql(
+                "WITH upd AS (UPDATE users SET name = 'x' RETURNING *) SELECT 
* FROM upd",
+                dialect="postgres",
+            )
+
+    def test_select_into_blocked(self):
+        """SELECT INTO creates a new table — should be blocked."""
+        with pytest.raises(SQLSafetyError, match="Data-modifying operation 
'Into'"):
+            validate_sql("SELECT * INTO new_table FROM users")
+
+    def test_plain_cte_select_still_allowed(self):
+        """Normal read-only CTEs should not be affected."""
+        result = validate_sql("WITH t AS (SELECT id FROM users) SELECT * FROM 
t")
+        assert len(result) == 1
+
+    def test_nested_subquery_select_still_allowed(self):
+        """Subqueries that are pure reads should not be affected."""
+        result = validate_sql("SELECT * FROM users WHERE id IN (SELECT user_id 
FROM orders)")
+        assert len(result) == 1
+
+    def test_deep_scan_runs_with_explicit_default_types(self):
+        """Deep scan should also block when DEFAULT_ALLOWED_TYPES is passed 
explicitly."""
+        from airflow.providers.common.ai.utils.sql_validation import 
DEFAULT_ALLOWED_TYPES
+
+        with pytest.raises(SQLSafetyError, match="Data-modifying operation 
'Delete'"):
+            validate_sql(
+                "WITH del AS (DELETE FROM users RETURNING *) SELECT * FROM 
del",
+                allowed_types=DEFAULT_ALLOWED_TYPES,
+                dialect="postgres",
+            )

Reply via email to