kacpermuda commented on code in PR #67457:
URL: https://github.com/apache/airflow/pull/67457#discussion_r3303746640


##########
providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py:
##########
@@ -262,3 +262,74 @@ def get_uri(self) -> str:
                 uri = f"{uri}?{query_string}"
 
         return uri
+
+    def get_openlineage_database_info(self, connection):
+        """
+        Return JDBC database information for OpenLineage.
+
+        Returns ``None`` when the database scheme cannot be inferred from the
+        connection's ``sqlalchemy_scheme`` extra or the JDBC URL in ``host``.
+        """
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+        scheme = self._get_openlineage_scheme(connection)
+        if not scheme:
+            return None
+
+        return DatabaseInfo(
+            scheme=scheme,
+            authority=self._get_openlineage_authority(connection),
+            database=connection.schema or None,
+        )
+
+    def get_openlineage_database_dialect(self, connection) -> str:
+        """Return SQL dialect inferred from the JDBC connection, or 
``generic``."""
+        return self._get_openlineage_scheme(connection) or "generic"
+
+    def get_openlineage_default_schema(self) -> str | None:
+        """Return default schema from the connection."""
+        return self.connection.schema or None
+
+    @staticmethod
+    def _get_openlineage_scheme(connection) -> str | None:
+        """Infer scheme from ``sqlalchemy_scheme`` extra or JDBC URL prefix."""
+        raw: str | None = None
+        sqlalchemy_scheme = connection.extra_dejson.get("sqlalchemy_scheme")
+        if sqlalchemy_scheme:
+            raw = sqlalchemy_scheme.split("+")[0]
+        else:
+            host = connection.host or ""
+            if host.startswith("jdbc:"):
+                jdbc_part = host[5:]
+                for sep in ("://", ":"):
+                    if sep in jdbc_part:
+                        candidate = jdbc_part.split(sep)[0]
+                        if candidate:
+                            raw = candidate
+                        break
+
+        if not raw:
+            return None
+        # Normalize SQLAlchemy's "postgresql" to OL's canonical "postgres" so
+        # JDBC-Postgres shares a namespace with PostgresHook downstream.
+        return "postgres" if raw == "postgresql" else raw
+
+    @staticmethod
+    def _get_openlineage_authority(connection) -> str | None:
+        """Extract ``host:port`` from a JDBC URL in ``host`` or from plain 
``host``/``port``."""
+        host = connection.host or ""
+        if host.startswith("jdbc:"):
+            rest = host[5:]
+            if "://" not in rest:
+                # Non-standard JDBC URL we can't reliably parse (e.g. Oracle
+                # ``thin:@host:port:sid``, H2 ``mem:test``).
+                return None
+            after = rest.split("://", 1)[1]
+            # Strip path, query string, and driver-specific option separator
+            # (e.g. SQL Server uses ``;`` for connection properties).
+            for sep in ("/", "?", ";"):
+                after = after.split(sep, 1)[0]
+            return after or None

Review Comment:
   If a user embeds userinfo in the JDBC URL (legal for several drivers, e.g. 
`jdbc:postgresql://user:pass@host:5432/db`), `after` ends up as 
`user:pass@host:5432` and gets emitted as part of the OL namespace.
   
   We can try stripping userinfo after the path/query/option trim:
   
   ```python
   for sep in ("/", "?", ";"):
       after = after.split(sep, 1)[0]
   if "@" in after:
       after = after.rsplit("@", 1)[-1]
   return after or None
   ```
   
   Worth a test case for this too.



##########
providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py:
##########
@@ -262,3 +262,74 @@ def get_uri(self) -> str:
                 uri = f"{uri}?{query_string}"
 
         return uri
+
+    def get_openlineage_database_info(self, connection):
+        """
+        Return JDBC database information for OpenLineage.
+
+        Returns ``None`` when the database scheme cannot be inferred from the
+        connection's ``sqlalchemy_scheme`` extra or the JDBC URL in ``host``.
+        """
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+        scheme = self._get_openlineage_scheme(connection)
+        if not scheme:
+            return None
+
+        return DatabaseInfo(
+            scheme=scheme,
+            authority=self._get_openlineage_authority(connection),
+            database=connection.schema or None,
+        )
+
+    def get_openlineage_database_dialect(self, connection) -> str:
+        """Return SQL dialect inferred from the JDBC connection, or 
``generic``."""
+        return self._get_openlineage_scheme(connection) or "generic"
+
+    def get_openlineage_default_schema(self) -> str | None:
+        """Return default schema from the connection."""
+        return self.connection.schema or None
+
+    @staticmethod
+    def _get_openlineage_scheme(connection) -> str | None:
+        """Infer scheme from ``sqlalchemy_scheme`` extra or JDBC URL prefix."""
+        raw: str | None = None
+        sqlalchemy_scheme = connection.extra_dejson.get("sqlalchemy_scheme")
+        if sqlalchemy_scheme:
+            raw = sqlalchemy_scheme.split("+")[0]
+        else:
+            host = connection.host or ""
+            if host.startswith("jdbc:"):
+                jdbc_part = host[5:]
+                for sep in ("://", ":"):
+                    if sep in jdbc_part:
+                        candidate = jdbc_part.split(sep)[0]
+                        if candidate:
+                            raw = candidate
+                        break
+
+        if not raw:
+            return None
+        # Normalize SQLAlchemy's "postgresql" to OL's canonical "postgres" so
+        # JDBC-Postgres shares a namespace with PostgresHook downstream.
+        return "postgres" if raw == "postgresql" else raw
+
+    @staticmethod
+    def _get_openlineage_authority(connection) -> str | None:
+        """Extract ``host:port`` from a JDBC URL in ``host`` or from plain 
``host``/``port``."""
+        host = connection.host or ""
+        if host.startswith("jdbc:"):
+            rest = host[5:]
+            if "://" not in rest:

Review Comment:
   `jdbc:oracle:thin:@//host:1521/service` is a standard and increasingly 
common Oracle JDBC URL, but it doesn't contain the literal substring `://` (it 
has `:@//`), so this guard returns `None` and the authority is lost. The scheme 
is still detected as `oracle`, so we end up emitting events with no authority 
for a perfectly parseable URL.
   
   We can handle it before the `://` check:
   
   ```python
   if "@//" in rest:
       after = rest.split("@//", 1)[1]
       for sep in ("/", "?", ";"):
           after = after.split(sep, 1)[0]
       return after or None
   if "://" not in rest:
       return None
   ```
   
   Would also add a parametrized test alongside the existing Oracle thin / H2 
cases.



##########
providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py:
##########
@@ -262,3 +262,74 @@ def get_uri(self) -> str:
                 uri = f"{uri}?{query_string}"
 
         return uri
+
+    def get_openlineage_database_info(self, connection):
+        """
+        Return JDBC database information for OpenLineage.
+
+        Returns ``None`` when the database scheme cannot be inferred from the
+        connection's ``sqlalchemy_scheme`` extra or the JDBC URL in ``host``.
+        """
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+        scheme = self._get_openlineage_scheme(connection)
+        if not scheme:
+            return None
+
+        return DatabaseInfo(
+            scheme=scheme,
+            authority=self._get_openlineage_authority(connection),
+            database=connection.schema or None,
+        )
+
+    def get_openlineage_database_dialect(self, connection) -> str:
+        """Return SQL dialect inferred from the JDBC connection, or 
``generic``."""
+        return self._get_openlineage_scheme(connection) or "generic"
+
+    def get_openlineage_default_schema(self) -> str | None:
+        """Return default schema from the connection."""
+        return self.connection.schema or None
+
+    @staticmethod
+    def _get_openlineage_scheme(connection) -> str | None:
+        """Infer scheme from ``sqlalchemy_scheme`` extra or JDBC URL prefix."""
+        raw: str | None = None
+        sqlalchemy_scheme = connection.extra_dejson.get("sqlalchemy_scheme")
+        if sqlalchemy_scheme:
+            raw = sqlalchemy_scheme.split("+")[0]
+        else:
+            host = connection.host or ""
+            if host.startswith("jdbc:"):

Review Comment:
   Minor: per the JDBC spec the `jdbc:` prefix is case-insensitive, but 
`startswith("jdbc:")` is case-sensitive — `JDBC:postgresql://...` would fall 
through here (and in `_get_openlineage_authority`). This is consistent with the 
existing `get_uri()` above so it's not a regression, but worth normalizing 
(`host.lower().startswith("jdbc:")`) if you want to handle uppercase URLs that 
some tools produce.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to