This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a07d799482 Change dataset URI validation to raise warning instead of 
error in Airflow 2.9 (#39670)
a07d799482 is described below

commit a07d7994827db5b869f340037e39cb81e27a8b8b
Author: Tatiana Al-Chueyr <tatiana.alchu...@gmail.com>
AuthorDate: Fri May 17 07:51:15 2024 +0100

    Change dataset URI validation to raise warning instead of error in Airflow 
2.9 (#39670)
    
    
    Closes: #39486
    
    # Context
    
    Valid DAGs that worked in Airflow 2.8.x  and had tasks with outlets with 
specific URIs, such as 
`Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")`, stopped 
working in Airflow 2.9.0 & Airflow 2.9.1, after #37005 was merged.
    
    This was a breaking change in an Airflow minor version. We should avoid 
this.
    
    Airflow < 3.0 should raise a warning, and from Airflow 3.0, we can make 
errors by default. We can have a feature flag to allow users who want to see 
this in advance to enable errors in Airflow 2. x, but this should not be the 
default behaviour.
    
    The DAGs should continue working on Airflow 2.x minor/micro releases 
without errors (unless the user opts in via configuration).
    
    # How to reproduce
    
    By running the following DAG with `apache-airflow==2.9.1` and 
`apache-airflow-providers-postgres==5.11.0`, as an example:
    ```
    from datetime import datetime
    
    from airflow import DAG
    from airflow.datasets import Dataset
    from airflow.operators.empty import EmptyOperator
    
    
    
    with DAG(dag_id='empty_operator_example', start_date=datetime(2022, 1, 1), 
schedule_interval=None) as dag:
    
        task1 = EmptyOperator(
            task_id='empty_task1',
            dag=dag,
            
outlets=[Dataset("postgres://postgres:5432/postgres.dbt.stg_customers")]
        )
    
        task2 = EmptyOperator(
            task_id='empty_task2',
            dag=dag
        )
    
        task1 >> task2
    ```
    
    Causes to the exception:
    ```
    Broken DAG: [/usr/local/airflow/dags/example_issue.py]
    Traceback (most recent call last):
      File 
"/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 
81, in _sanitize_uri
        parsed = normalizer(parsed)
                 ^^^^^^^^^^^^^^^^^^
      File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py",
 line 34, in sanitize_uri
        raise ValueError("URI format postgres:// must contain database, schema, 
and table names")
    ValueError: URI format postgres:// must contain database, schema, and table 
names
    ```
    
    # About the changes introduced
    
    This PR introduces the following:
    
    1. A boolean configuration within `[core],` named 
`strict_dataset_uri_validation,` which should be `False` by default.
    
    2. When this configuration is `False,` Airflow should raise a warning 
saying:
    ```
    From Airflow 3, Airflow will be more strict with Dataset URIs, and the URI 
xx will no longer be valid. Please, follow the expected standard as documented 
in XX.
    ```
    
    3. If this configuration is `True,` Airflow should raise the exception, as 
it does now in Airflow 2.9.0 and 2.9.1
    
    4. From Airflow 3.0, we change this configuration to be `True` by default.
---
 airflow/config_templates/config.yml |  9 +++++++++
 airflow/datasets/__init__.py        | 16 +++++++++++++++-
 tests/datasets/test_dataset.py      | 32 +++++++++++++++++++++++++++++++-
 3 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 36fb176e95..1db38932e6 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -464,6 +464,15 @@ core:
       sensitive: true
       default: ~
       example: '{"some_param": "some_value"}'
+    strict_dataset_uri_validation:
+      description: |
+        Dataset URI validation should raise an exception if it is not 
compliant with AIP-60.
+        By default this configuration is false, meaning that Airflow 2.x only 
warns the user.
+        In Airflow 3, this configuration will be enabled by default.
+      default: "False"
+      example: ~
+      version_added: 2.9.2
+      type: boolean
     database_access_isolation:
       description: (experimental) Whether components should use Airflow 
Internal API for DB connectivity.
       version_added: 2.6.0
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 37720314d3..d8012e6874 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -27,6 +27,9 @@ import attr
 if TYPE_CHECKING:
     from urllib.parse import SplitResult
 
+
+from airflow.configuration import conf
+
 __all__ = ["Dataset", "DatasetAll", "DatasetAny"]
 
 
@@ -87,7 +90,18 @@ def _sanitize_uri(uri: str) -> str:
         fragment="",  # Ignore any fragments.
     )
     if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
-        parsed = normalizer(parsed)
+        try:
+            parsed = normalizer(parsed)
+        except ValueError as exception:
+            if conf.getboolean("core", "strict_dataset_uri_validation", 
fallback=False):
+                raise exception
+            else:
+                warnings.warn(
+                    f"The dataset URI {uri} is not AIP-60 compliant. "
+                    f"In Airflow 3, this will raise an exception. More 
information: {repr(exception)}",
+                    UserWarning,
+                    stacklevel=3,
+                )
     return urllib.parse.urlunsplit(parsed)
 
 
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
index 31aebff9fa..5453d971d9 100644
--- a/tests/datasets/test_dataset.py
+++ b/tests/datasets/test_dataset.py
@@ -20,15 +20,17 @@ from __future__ import annotations
 import os
 from collections import defaultdict
 from typing import Callable
+from unittest.mock import patch
 
 import pytest
 from sqlalchemy.sql import select
 
-from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny
+from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny, 
_sanitize_uri
 from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.empty import EmptyOperator
 from airflow.serialization.serialized_objects import BaseSerialization, 
SerializedDAG
+from tests.test_utils.config import conf_vars
 
 
 @pytest.fixture
@@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: 
Callable[[], None], error: str) -
     with pytest.raises(TypeError) as info:
         expression()
     assert str(info.value) == error
+
+
+def mock_get_uri_normalizer(normalized_scheme):
+    def normalizer(uri):
+        raise ValueError("Incorrect URI format")
+
+    return normalizer
+
+
+@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
+@patch("airflow.datasets.warnings.warn")
+def test__sanitize_uri_raises_warning(mock_warn):
+    _sanitize_uri("postgres://localhost:5432/database.schema.table")
+    msg = mock_warn.call_args.args[0]
+    assert "The dataset URI postgres://localhost:5432/database.schema.table is 
not AIP-60 compliant." in msg
+    assert (
+        "In Airflow 3, this will raise an exception. More information: 
ValueError('Incorrect URI format')"
+        in msg
+    )
+
+
+@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
+@conf_vars({("core", "strict_dataset_uri_validation"): "True"})
+def test__sanitize_uri_raises_exception():
+    with pytest.raises(ValueError) as e_info:
+        _sanitize_uri("postgres://localhost:5432/database.schema.table")
+    assert isinstance(e_info.value, ValueError)
+    assert str(e_info.value) == "Incorrect URI format"

Reply via email to