This is an automated email from the ASF dual-hosted git repository.
dabla pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b183a4cb7b7 Use execute_values instead of execute_batch for better
bulk insert performance with PostgresHook (#68207)
b183a4cb7b7 is described below
commit b183a4cb7b7e03f5ae596a3902313e9f17485ce1
Author: David Blain <[email protected]>
AuthorDate: Tue Jun 9 20:01:20 2026 +0200
Use execute_values instead of execute_batch for better bulk insert
performance with PostgresHook (#68207)
* refactor: Use execute_values instead of execute_batch for better bulk
insert performance with psycopg2 in insert_rows of PostgresHook when
fast_executemany is enabled unless psycopg3 is used
* refactor: Log a warning message when fast_executemany is used when
psycopg3 is being used as this has no effect
---
.../airflow/providers/postgres/hooks/postgres.py | 37 ++++++++++++-----
.../tests/unit/postgres/hooks/test_postgres.py | 48 ++++++++++++++++++----
2 files changed, 66 insertions(+), 19 deletions(-)
diff --git
a/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
b/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
index ec28bcc834f..66f692eb004 100644
--- a/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
+++ b/providers/postgres/src/airflow/providers/postgres/hooks/postgres.py
@@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any, Literal, Protocol,
TypeAlias, cast, overl
from more_itertools import chunked
from psycopg2 import connect as ppg2_connect
-from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor,
execute_batch
+from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor,
execute_values
from airflow.providers.common.compat.sdk import (
AirflowException,
@@ -660,6 +660,10 @@ class PostgresHook(DbApiHook):
"""
Insert a collection of tuples into a table.
+ When ``fast_executemany=True`` with psycopg2, uses ``execute_values``
which batches
+ all rows into a single INSERT statement for better performance.
+ For psycopg3, the default ``executemany`` already uses pipelining for
high performance.
+
Rows are inserted in chunks, each chunk (of size ``commit_every``) is
done in a new transaction.
@@ -668,20 +672,29 @@ class PostgresHook(DbApiHook):
:param target_fields: The names of the columns to fill in the table
:param commit_every: The maximum number of rows to insert in one
transaction. Set to 0 to insert all rows in one transaction.
- :param replace: Whether to replace instead of insert
+ :param replace: Whether to replace instead of insert (uses ON CONFLICT)
:param executemany: If True, all rows are inserted at once in
chunks defined by the commit_every parameter. This only works if
all rows
have same number of column names, but leads to better performance.
:param fast_executemany: If True, rows will be inserted using an
optimized
- bulk execution strategy (``psycopg2.extras.execute_batch``). This
can
- significantly improve performance for large inserts. If set to
False,
- the method falls back to the default implementation from
- ``DbApiHook.insert_rows``.
+ bulk execution strategy (``psycopg2.extras.execute_values``),
unless psycopg3
+ is being used. This can significantly improve performance for
large inserts.
+ If set to False or psycopg3 is being used, the method falls back
to the default
+ implementation from ``DbApiHook.insert_rows``.
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
"""
- # if fast_executemany is disabled, defer to default implementation of
insert_rows in DbApiHook
- if not fast_executemany:
+ # psycopg3's executemany already uses pipelining, so use default
implementation
+ # Only override for psycopg2 with fast_executemany to use
execute_values
+ if USE_PSYCOPG3 and fast_executemany:
+ self.log.warning(
+ "fast_executemany=True has no effect when using psycopg3. "
+ "psycopg3's executemany already uses pipelining for optimal
performance."
+ )
+ if USE_PSYCOPG3 or not fast_executemany:
+ # Reset to default format in case a previous fast_executemany call
failed
+ self._insert_statement_format = "INSERT INTO {} {} VALUES ({})"
+
return super().insert_rows(
table,
rows,
@@ -693,9 +706,11 @@ class PostgresHook(DbApiHook):
**kwargs,
)
- # if fast_executemany is enabled, use optimized execute_batch from
psycopg
+ # if fast_executemany is enabled with psycopg2, use optimized
execute_values from psycopg
+ self._insert_statement_format = "INSERT INTO {} {} VALUES %s"
+
nb_rows = 0
- sql = None # not generated unless we actually process at least one
chunk
+ sql: str | None = None # not generated unless we actually process at
least one chunk
with self._create_autocommit_connection(autocommit) as conn:
conn.commit()
with closing(conn.cursor()) as cur:
@@ -710,7 +725,7 @@ class PostgresHook(DbApiHook):
self.log.debug("Generated sql: %s", sql)
try:
- execute_batch(cur, sql, values, page_size=commit_every)
+ execute_values(cur, sql, values,
page_size=commit_every)
except Exception as e:
self.log.error("Generated sql: %s", sql)
self.log.error("Parameters: %s", values)
diff --git a/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
b/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
index fab8bc6f5d7..f356e3f233e 100644
--- a/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
+++ b/providers/postgres/tests/unit/postgres/hooks/test_postgres.py
@@ -1012,8 +1012,8 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
assert call_kw["sql"] == f"INSERT INTO {table} VALUES (%s)"
assert call_kw["row_count"] == 2
- @mock.patch("airflow.providers.postgres.hooks.postgres.execute_batch")
- def test_insert_rows_fast_executemany(self, mock_execute_batch):
+ @mock.patch("airflow.providers.postgres.hooks.postgres.execute_values")
+ def test_insert_rows_fast_executemany(self, mock_execute_values):
table = "table"
rows = [("hello",), ("world",)]
@@ -1025,9 +1025,9 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
commit_count = 2 # The first and last commit
assert self.conn.commit.call_count == commit_count
- mock_execute_batch.assert_called_once_with(
+ mock_execute_values.assert_called_once_with(
self.cur,
- f"INSERT INTO {table} VALUES (%s)", # expected SQL
+ f"INSERT INTO {table} VALUES %s", # expected SQL
[("hello",), ("world",)], # expected values
page_size=1000,
)
@@ -1036,9 +1036,8 @@ class TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
self.cur.executemany.assert_not_called()
@mock.patch("airflow.providers.postgres.hooks.postgres.send_sql_hook_lineage")
- @mock.patch("airflow.providers.postgres.hooks.postgres.execute_batch")
- def test_insert_rows_fast_executemany_hook_lineage(self,
mock_execute_batch, mock_send_lineage):
-
+ @mock.patch("airflow.providers.postgres.hooks.postgres.execute_values")
+ def test_insert_rows_fast_executemany_hook_lineage(self,
mock_execute_values, mock_send_lineage):
table = "table"
rows = [("hello",), ("world",)]
@@ -1047,9 +1046,28 @@ class
TestPostgresHookPPG2(_BasePostgresHookRuntimeTests):
mock_send_lineage.assert_called_once()
call_kw = mock_send_lineage.call_args.kwargs
assert call_kw["context"] is self.db_hook
- assert call_kw["sql"] == f"INSERT INTO {table} VALUES (%s)"
+ assert call_kw["sql"] == f"INSERT INTO {table} VALUES %s"
assert call_kw["row_count"] == 2
+ @mock.patch("airflow.providers.postgres.hooks.postgres.USE_PSYCOPG3", True)
+ @mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.insert_rows")
+ def test_insert_rows_fast_executemany_psycopg3_fallback(self,
mock_super_insert_rows):
+ """Verify psycopg3 falls back to default implementation even with
fast_executemany=True."""
+ table = "table"
+ rows = [("hello",), ("world",)]
+
+ self.db_hook.insert_rows(table, rows, fast_executemany=True)
+
+ mock_super_insert_rows.assert_called_once_with(
+ table,
+ rows,
+ target_fields=None,
+ commit_every=1000,
+ replace=False,
+ executemany=False,
+ autocommit=False,
+ )
+
@pytest.mark.usefixtures("reset_logging_config")
def test_get_all_db_log_messages(self, mocker):
messages = ["a", "b", "c"]
@@ -1207,3 +1225,17 @@ class
TestPostgresHookPPG3(_BasePostgresHookRuntimeTests):
mock_logger.info.assert_any_call("Message from db: 42")
finally:
hook.run(sql=f"DROP PROCEDURE {proc_name} (s text)")
+
+ @pytest.mark.usefixtures("reset_logging_config")
+ def test_insert_rows_fast_executemany_psycopg3_logs_warning(self, mocker):
+ mock_logger =
mocker.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.log")
+
+ table = "table"
+ rows = [("hello",), ("world",)]
+
+ self.db_hook.insert_rows(table, rows, fast_executemany=True)
+
+ mock_logger.warning.assert_called_once_with(
+ "fast_executemany=True has no effect when using psycopg3. "
+ "psycopg3's executemany already uses pipelining for optimal
performance."
+ )