Copilot commented on code in PR #52976:
URL: https://github.com/apache/airflow/pull/52976#discussion_r2281017969


##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -36,20 +36,66 @@
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 from airflow.providers.postgres.dialects.postgres import PostgresDialect
 
+USE_PSYCOPG3: bool
+try:
+    import psycopg as psycopg  # needed for patching in unit tests
+    import sqlalchemy
+    from packaging.version import Version
+
+    sqlalchemy_version = Version(sqlalchemy.__version__)
+    is_sqla2 = (sqlalchemy_version.major, sqlalchemy_version.minor, 
sqlalchemy_version.micro) >= (2, 0, 0)
+
+    USE_PSYCOPG3 = bool(psycopg) and is_sqla2

Review Comment:
   Using bool(psycopg) on an imported module will always return True. This 
should check if psycopg is successfully imported, not convert the module to 
boolean. Consider using a flag set during successful import instead.
   ```suggestion
       USE_PSYCOPG3 = is_sqla2
   ```



##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -421,10 +562,12 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
 
     def get_db_log_messages(self, conn) -> None:
         """
-        Log all database messages sent to the client during the session.
+        Log database messages.
 
-        :param conn: Connection object
+        For psycopg3, this is a no-op as notices are handled in real-time by 
the notice_handler.
         """
-        if self.enable_log_db_messages:
-            for output in conn.notices:
-                self.log.info(output)
+        if USE_PSYCOPG3 or not self.enable_log_db_messages:
+            return
+
+        for output in conn.notices:
+            self.log.info(output)

Review Comment:
   The condition returns early for psycopg3 even when enable_log_db_messages is 
True, preventing any logging. This should handle psycopg3 notice logging 
differently rather than skipping it entirely.



##########
providers/postgres/src/airflow/providers/postgres/hooks/postgres.py:
##########
@@ -231,6 +325,11 @@ def get_df(
 
             engine = self.get_sqlalchemy_engine()
             with engine.connect() as conn:
+                if USE_PSYCOPG3:
+                    # Cast the sql to str if it's a list (handle it 
appropriately)
+                    if isinstance(sql, list):
+                        sql = "; ".join(sql)  # Or handle multiple queries 
differently
+                    return cast("PandasDataFrame", psql.read_sql(sql, 
con=conn, params=parameters, **kwargs))
                 return psql.read_sql(sql, con=conn, params=parameters, 
**kwargs)

Review Comment:
   The psycopg3-specific code block contains a comment about handling SQL as a 
list but the implementation simply joins with semicolons. This could lead to 
unexpected behavior if multiple queries have different parameter sets or 
transaction requirements.
   ```suggestion
                       # If sql is a list, execute each statement separately 
and concatenate results
                       if isinstance(sql, list):
                           dfs = []
                           # If parameters is a list/tuple, match each param 
set to each query
                           if isinstance(parameters, (list, tuple)) and 
len(parameters) == len(sql):
                               for stmt, param in zip(sql, parameters):
                                   df = psql.read_sql(stmt, con=conn, 
params=param, **kwargs)
                                   dfs.append(df)
                           else:
                               for stmt in sql:
                                   df = psql.read_sql(stmt, con=conn, 
params=parameters, **kwargs)
                                   dfs.append(df)
                           return cast("PandasDataFrame", pd.concat(dfs, 
ignore_index=True))
                       return cast("PandasDataFrame", psql.read_sql(sql, 
con=conn, params=parameters, **kwargs))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to