This is an automated email from the ASF dual-hosted git repository.

dabla pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b183a4cb7b7 Use execute_values instead of execute_batch for better 
bulk insert performance with PostgresHook (#68207)
b183a4cb7b7 is described below

commit b183a4cb7b7e03f5ae596a3902313e9f17485ce1
Author: David Blain <[email protected]>
AuthorDate: Tue Jun 9 20:01:20 2026 +0200

    Use execute_values instead of execute_batch for better bulk insert 
performance with PostgresHook (#68207)
    
    * refactor: Use execute_values instead of execute_batch for better bulk 
insert performance with psycopg2 in insert_rows of PostgresHook when 
fast_executemany is enabled unless psycopg3 is used
    
    * refactor: Log a warning message when fast_executemany is used when 
psycopg3 is being used as this has no effect
---
 .../airflow/providers/postgres/hooks/postgres.py   | 37 ++++++++++++-----
 .../tests/unit/postgres/hooks/test_postgres.py     | 48 ++++++++++++++++++----
 2 files changed, 66 insertions(+), 19 deletions(-)

diff --git 
a/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py 
b/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
index ec28bcc834f..66f692eb004 100644
--- a/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
+++ b/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
@@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any, Literal, Protocol, 
TypeAlias, cast, overl
 
 from more_itertools import chunked
 from psycopg2 import connect as ppg2_connect
-from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor, 
execute_batch
+from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor, 
execute_values
 
 from airflow.providers.common.compat.sdk import (
     AirflowException,
@@ -660,6 +660,10 @@ class PostgresHook(DbApiHook):
         """
         Insert a collection of tuples into a table.
 
+        When ``fast_executemany=True`` with psycopg2, uses ``execute_values`` 
which batches
+        all rows into a single INSERT statement for better performance.
+        For psycopg3, the default ``executemany`` already uses pipelining for 
high performance.
+
         Rows are inserted in chunks, each chunk (of size ``commit_every``) is
         done in a new transaction.
 
@@ -668,20 +672,29 @@ class PostgresHook(DbApiHook):
         :param target_fields: The names of the columns to fill in the table
         :param commit_every: The maximum number of rows to insert in one
             transaction. Set to 0 to insert all rows in one transaction.
-        :param replace: Whether to replace instead of insert
+        :param replace: Whether to replace instead of insert (uses ON CONFLICT)
         :param executemany: If True, all rows are inserted at once in
             chunks defined by the commit_every parameter. This only works if 
all rows
             have same number of column names, but leads to better performance.
         :param fast_executemany: If True, rows will be inserted using an 
optimized
-            bulk execution strategy (``psycopg2.extras.execute_batch``). This 
can
-            significantly improve performance for large inserts. If set to 
False,
-            the method falls back to the default implementation from
-            ``DbApiHook.insert_rows``.
+            bulk execution strategy (``psycopg2.extras.execute_values``), 
unless psycopg3
+            is being used. This can significantly improve performance for 
large inserts.
+            If set to False or psycopg3 is being used, the method falls back 
to the default
+            implementation from ``DbApiHook.insert_rows``.
         :param autocommit: What to set the connection's autocommit setting to
             before executing the query.
         """
-        # if fast_executemany is disabled, defer to default implementation of 
insert_rows in DbApiHook
-        if not fast_executemany:
+        # psycopg3's executemany already uses pipelining, so use default 
implementation
+        # Only override for psycopg2 with fast_executemany to use 
execute_values
+        if USE_PSYCOPG3 and fast_executemany:
+            self.log.warning(
+                "fast_executemany=True has no effect when using psycopg3. "
+                "psycopg3's executemany already uses pipelining for optimal 
performance."
+            )
+        if USE_PSYCOPG3 or not fast_executemany:
+            # Reset to default format in case a previous fast_executemany call 
failed
+            self._insert_statement_format = "INSERT INTO {} {} VALUES ({})"
+
             return super().insert_rows(
                 table,
                 rows,
@@ -693,9 +706,11 @@ class PostgresHook(DbApiHook):
                 **kwargs,
             )
 
-        # if fast_executemany is enabled, use optimized execute_batch from 
psycopg
+        # if fast_executemany is enabled with psycopg2, use optimized 
execute_values from psycopg
+        self._insert_statement_format = "INSERT INTO {} {} VALUES %s"
+
         nb_rows = 0
-        sql = None  # not generated unless we actually process at least one 
chunk
+        sql: str | None = None  # not generated unless we actually process at 
least one chunk
         with self._create_autocommit_connection(autocommit) as conn:
             conn.commit()
             with closing(conn.cursor()) as cur:
@@ -710,7 +725,7 @@ class PostgresHook(DbApiHook):
                     self.log.debug("Generated sql: %s", sql)
 
                     try:
-                        execute_batch(cur, sql, values, page_size=commit_every)
+                        execute_values(cur, sql, values, 
page_size=commit_every)
                     except Exception as e:
                         self.log.error("Generated sql: %s", sql)
                         self.log.error("Parameters: %s", values)
diff --git a/providers/postgres/tests/unit/postgres/hooks/test_postgres.py 
b/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
index fab8bc6f5d7..f356e3f233e 100644
--- a/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
+++ b/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
@@ -1012,8 +1012,8 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
         assert call_kw["sql"] == f"INSERT INTO {table}  VALUES (%s)"
         assert call_kw["row_count"] == 2
 
-    @mock.patch("airflow.providers.postgres.hooks.postgres.execute_batch")
-    def test_insert_rows_fast_executemany(self, mock_execute_batch):
+    @mock.patch("airflow.providers.postgres.hooks.postgres.execute_values")
+    def test_insert_rows_fast_executemany(self, mock_execute_values):
         table = "table"
         rows = [("hello",), ("world",)]
 
@@ -1025,9 +1025,9 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
         commit_count = 2  # The first and last commit
         assert self.conn.commit.call_count == commit_count
 
-        mock_execute_batch.assert_called_once_with(
+        mock_execute_values.assert_called_once_with(
             self.cur,
-            f"INSERT INTO {table}  VALUES (%s)",  # expected SQL
+            f"INSERT INTO {table}  VALUES %s",  # expected SQL
             [("hello",), ("world",)],  # expected values
             page_size=1000,
         )
@@ -1036,9 +1036,8 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
         self.cur.executemany.assert_not_called()
 
     
@mock.patch("airflow.providers.postgres.hooks.postgres.send_sql_hook_lineage")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.execute_batch")
-    def test_insert_rows_fast_executemany_hook_lineage(self, 
mock_execute_batch, mock_send_lineage):
-
+    @mock.patch("airflow.providers.postgres.hooks.postgres.execute_values")
+    def test_insert_rows_fast_executemany_hook_lineage(self, 
mock_execute_values, mock_send_lineage):
         table = "table"
         rows = [("hello",), ("world",)]
 
@@ -1047,9 +1046,28 @@ class 
TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
         mock_send_lineage.assert_called_once()
         call_kw = mock_send_lineage.call_args.kwargs
         assert call_kw["context"] is self.db_hook
-        assert call_kw["sql"] == f"INSERT INTO {table}  VALUES (%s)"
+        assert call_kw["sql"] == f"INSERT INTO {table}  VALUES %s"
         assert call_kw["row_count"] == 2
 
+    @mock.patch("airflow.providers.postgres.hooks.postgres.USE_PSYCOPG3", True)
+    @mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.insert_rows")
+    def test_insert_rows_fast_executemany_psycopg3_fallback(self, 
mock_super_insert_rows):
+        """Verify psycopg3 falls back to default implementation even with 
fast_executemany=True."""
+        table = "table"
+        rows = [("hello",), ("world",)]
+
+        self.db_hook.insert_rows(table, rows, fast_executemany=True)
+
+        mock_super_insert_rows.assert_called_once_with(
+            table,
+            rows,
+            target_fields=None,
+            commit_every=1000,
+            replace=False,
+            executemany=False,
+            autocommit=False,
+        )
+
     @pytest.mark.usefixtures("reset_logging_config")
     def test_get_all_db_log_messages(self, mocker):
         messages = ["a", "b", "c"]
@@ -1207,3 +1225,17 @@ class 
TestPostgresHookPPG3(_BasePostgresHookRuntimeTests):
             mock_logger.info.assert_any_call("Message from db: 42")
         finally:
             hook.run(sql=f"DROP PROCEDURE {proc_name} (s text)")
+
+    @pytest.mark.usefixtures("reset_logging_config")
+    def test_insert_rows_fast_executemany_psycopg3_logs_warning(self, mocker):
+        mock_logger = 
mocker.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.log")
+
+        table = "table"
+        rows = [("hello",), ("world",)]
+
+        self.db_hook.insert_rows(table, rows, fast_executemany=True)
+
+        mock_logger.warning.assert_called_once_with(
+            "fast_executemany=True has no effect when using psycopg3. "
+            "psycopg3's executemany already uses pipelining for optimal 
performance."
+        )

Reply via email to