Copilot commented on code in PR #52976:
URL: https://github.com/apache/airflow/pull/52976#discussion_r2281017969
##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -36,20 +36,66 @@
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.postgres.dialects.postgres import PostgresDialect
+USE_PSYCOPG3: bool
+try:
+ import psycopg as psycopg # needed for patching in unit tests
+ import sqlalchemy
+ from packaging.version import Version
+
+ sqlalchemy_version = Version(sqlalchemy.__version__)
+ is_sqla2 = (sqlalchemy_version.major, sqlalchemy_version.minor,
sqlalchemy_version.micro) >= (2, 0, 0)
+
+ USE_PSYCOPG3 = bool(psycopg) and is_sqla2
Review Comment:
Using bool(psycopg) on an imported module will always return True. This
should check if psycopg is successfully imported, not convert the module to
boolean. Consider using a flag set during successful import instead.
```suggestion
USE_PSYCOPG3 = is_sqla2
```
##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -421,10 +562,12 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
def get_db_log_messages(self, conn) -> None:
"""
- Log all database messages sent to the client during the session.
+ Log database messages.
- :param conn: Connection object
+ For psycopg3, this is a no-op as notices are handled in real-time by
the notice_handler.
"""
- if self.enable_log_db_messages:
- for output in conn.notices:
- self.log.info(output)
+ if USE_PSYCOPG3 or not self.enable_log_db_messages:
+ return
+
+ for output in conn.notices:
+ self.log.info(output)
Review Comment:
The condition returns early for psycopg3 even when enable_log_db_messages is
True, preventing any logging. This should handle psycopg3 notice logging
differently rather than skipping it entirely.
##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -231,6 +325,11 @@ def get_df(
engine = self.get_sqlalchemy_engine()
with engine.connect() as conn:
+ if USE_PSYCOPG3:
+ # Cast the sql to str if it's a list (handle it
appropriately)
+ if isinstance(sql, list):
+ sql = "; ".join(sql) # Or handle multiple queries
differently
+ return cast("PandasDataFrame", psql.read_sql(sql,
con=conn, params=parameters, **kwargs))
return psql.read_sql(sql, con=conn, params=parameters,
**kwargs)
Review Comment:
The psycopg3-specific code block contains a comment about handling SQL as a
list but the implementation simply joins with semicolons. This could lead to
unexpected behavior if multiple queries have different parameter sets or
transaction requirements.
```suggestion
# If sql is a list, execute each statement separately
and concatenate results
if isinstance(sql, list):
dfs = []
# If parameters is a list/tuple, match each param
set to each query
if isinstance(parameters, (list, tuple)) and
len(parameters) == len(sql):
for stmt, param in zip(sql, parameters):
df = psql.read_sql(stmt, con=conn,
params=param, **kwargs)
dfs.append(df)
else:
for stmt in sql:
df = psql.read_sql(stmt, con=conn,
params=parameters, **kwargs)
dfs.append(df)
return cast("PandasDataFrame", pd.concat(dfs,
ignore_index=True))
return cast("PandasDataFrame", psql.read_sql(sql,
con=conn, params=parameters, **kwargs))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]