I've spent some more time on this patch cleaning up some things and trying to simplify some things.
I've renamed "copy_for_batch_insert_threshold" to "batch_with_copy_threshold" and removed the boolean option "use_copy_for_batch_insert", so now to enable the COPY usage for batch inserts it only need to set batch_with_copy_threshold to a number greater than 0. Also the COPY can only be used if batching is also enabled (batch_size > 1) and it will only be used for the COPY FROM on a foreign table and for inserts into table partitions that are also foreign tables. -- Matheus Alcantara EDB: http://www.enterprisedb.com
From dead99e8a2db663df8676f1caca1d834e19ca076 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara <[email protected]> Date: Wed, 26 Nov 2025 16:34:46 -0300 Subject: [PATCH v8] postgres_fdw: speed up batch inserts using COPY This commit include a new foreign table/server option "batch_with_copy_threshold" that enable the usage of the COPY command to speed up batch inserts when a COPY FROM or an insert into a table partition that is a foreign table is executed. In both cases the BeginForeignInsert fdw routine is called, so this new option is retrieved only on this routine. For the other cases that use the ForeignModify routines still use the INSERT as a remote SQL. Note that the COPY will only be used for batch inserts and only if the current number of rows being inserted on the batch operation is >= batch_with_copy_threshold. If batch_size=100, batch_with_copy_threshold=50 and number of rows being inserted is 120 the first 100 rows will be inserted using the COPY command and the remaining 20 rows will be inserted using INSERT statement because it did not reach the copy threshold. --- contrib/postgres_fdw/deparse.c | 35 +++ .../postgres_fdw/expected/postgres_fdw.out | 26 +++ contrib/postgres_fdw/option.c | 6 +- contrib/postgres_fdw/postgres_fdw.c | 210 +++++++++++++++++- contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 23 ++ 6 files changed, 298 insertions(+), 3 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index f2fb0051843..54e821f6bf5 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2236,6 +2236,41 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +deparseCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + int nattrs = list_length(target_attrs); + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + if (nattrs > 0) + appendStringInfo(buf, "("); + + foreach_int(attnum, target_attrs) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + if (nattrs > 0) + appendStringInfoString(buf, ") FROM STDIN"); + else + appendStringInfoString(buf, " FROM STDIN"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 48e3185b227..ffba243dece 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9215,6 +9215,19 @@ with result as (insert into itrtest values (1, 'test1'), (2, 'test2') returning drop trigger loct1_br_insert_trigger on loct1; drop trigger loct2_br_insert_trigger on loct2; +-- Test batch insert using COPY with batch_with_copy_threshold +delete from itrtest; +alter server loopback options (add batch_with_copy_threshold '2', batch_size '3'); +insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3'); +select * from itrtest; + a | b +---+------- + 1 | test1 + 2 | test2 + 2 | test3 +(3 rows) + +alter server loopback options (drop batch_with_copy_threshold, drop batch_size); drop table itrtest; drop table loct1; drop table loct2; @@ -9524,6 +9537,19 @@ select * from rem2; 2 | bar (2 rows) +delete from rem2; +-- Test COPY with batch_with_copy_threshold +alter foreign table rem2 options (add batch_with_copy_threshold '2'); +-- Insert 3 rows so that the third row fallback to normal INSERT statement path +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----- + 1 | foo + 2 | bar + 3 | baz +(3 rows) + delete from rem2; -- Test check constraints alter table loc2 add constraint loc2_f1positive check (f1 >= 0); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 04788b7e8b3..d2696206e75 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -157,7 +157,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) (void) ExtractExtensionList(defGetString(def), true); } else if (strcmp(def->defname, "fetch_size") == 0 || - strcmp(def->defname, "batch_size") == 0) + strcmp(def->defname, "batch_size") == 0 || + strcmp(def->defname, "batch_with_copy_threshold") == 0) { char *value; int int_val; @@ -263,6 +264,9 @@ InitPgFdwOptions(void) /* batch_size is available on both server and table */ {"batch_size", ForeignServerRelationId, false}, {"batch_size", ForeignTableRelationId, false}, + /* batch_with_copy_threshold is available on both server and table */ + {"batch_with_copy_threshold", ForeignServerRelationId, false}, + {"batch_with_copy_threshold", ForeignTableRelationId, false}, /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 06b52c65300..7896760d51a 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT( /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Buffer size to send COPY IN data*/ +#define COPYBUFSIZ 8192 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -198,6 +201,10 @@ typedef struct PgFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + /* COPY usage stuff */ + int batch_with_copy_threshold; /* value of FDW option */ + char *cmd_copy; /* COPY statement */ + /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ int p_nums; /* number of parameters to transmit */ @@ -545,6 +552,10 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static int get_batch_with_copy_threshold(Relation rel); +static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots); /* @@ -2013,8 +2024,30 @@ postgresExecForeignBatchInsert(EState *estate, */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; - rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slots, planSlots, numSlots); + + /* + * Check if "batch_with_copy_threshold" is enable (> 0) and if the COPY + * can be used based on the number of rows being inserted on this batch. + * The original query also should not have a RETURNING clause. + */ + if (fmstate->batch_with_copy_threshold > 0 && + fmstate->batch_with_copy_threshold <= *numSlots && + !fmstate->has_returning) + { + if (fmstate->cmd_copy == NULL) + { + StringInfoData sql; + + initStringInfo(&sql); + deparseCopySql(&sql, fmstate->rel, fmstate->target_attrs); + fmstate->cmd_copy = sql.data; + } + + rslot = execute_foreign_modify_using_copy(fmstate, slots, numSlots); + } + else + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -2265,6 +2298,16 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, retrieved_attrs != NIL, retrieved_attrs); + + /* + * Set batch_with_copy_threshold from foreign server/table options. We do + * this outside of create_foreign_modify() because we only want to use + * COPY as a remote SQL when a COPY FROM on a foreign table is executed or + * an insert is being performed on a table partition. In both cases the + * BeginForeignInsert fdw routine is called. + */ + fmstate->batch_with_copy_threshold = get_batch_with_copy_threshold(rel); + /* * If the given resultRelInfo already has PgFdwModifyState set, it means * the foreign table is an UPDATE subplan result rel; in which case, store @@ -4066,6 +4109,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + int i = 0; + + foreach_int(attnum, fmstate->target_attrs) + { + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoCharMacro(buf, '\t'); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[i], + datum); + + appendStringInfoString(buf, value); + } + i++; + } + + appendStringInfoCharMacro(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -7886,3 +7973,122 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * Determine COPY usage threshold for batching inserts for a given foreign + * table. The option specified for a table has precedence. + */ +static int +get_batch_with_copy_threshold(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + List *options = NIL; + ListCell *lc; + ForeignTable *table; + ForeignServer *server; + + /* + * We use 0 as default, which means that COPY will not be used by default + * for batching insert. + */ + int copy_for_batch_insert_threshold = 0; + + /* + * Load options for table and server. We append server options after table + * options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies enable_batch_with_copy. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "batch_with_copy_threshold") == 0) + { + (void) parse_int(defGetString(def), ©_for_batch_insert_threshold, 0, NULL); + break; + } + } + return copy_for_batch_insert_threshold; +} + +/* + * execute_foreign_modify_using_copy + * Perform foreign-table modification using the COPY command. + */ +static TupleTableSlot ** +execute_foreign_modify_using_copy(PgFdwModifyState *fmstate, + TupleTableSlot **slots, + int *numSlots) +{ + PGresult *res; + StringInfoData copy_data; + int n_rows; + int i; + + Assert(fmstate->cmd_copy != NULL); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, fmstate->cmd_copy)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + + /* + * Send initial COPY data if the buffer reach the limit to avoid large + * memory usage. + */ + if (copy_data.len >= COPYBUFSIZ) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy); + resetStringInfo(©_data); + } + } + + /* Send the remaining COPY data */ + if (copy_data.len > 0) + { + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy); + } + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..aa54d6bba53 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 9a8f9e28135..f973ef07d80 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2635,6 +2635,16 @@ with result as (insert into itrtest values (1, 'test1'), (2, 'test2') returning drop trigger loct1_br_insert_trigger on loct1; drop trigger loct2_br_insert_trigger on loct2; +-- Test batch insert using COPY with batch_with_copy_threshold +delete from itrtest; +alter server loopback options (add batch_with_copy_threshold '2', batch_size '3'); + +insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3'); + +select * from itrtest; + +alter server loopback options (drop batch_with_copy_threshold, drop batch_size); + drop table itrtest; drop table loct1; drop table loct2; @@ -2807,6 +2817,19 @@ select * from rem2; delete from rem2; +-- Test COPY with batch_with_copy_threshold +alter foreign table rem2 options (add batch_with_copy_threshold '2'); + +-- Insert 3 rows so that the third row fallback to normal INSERT statement path +copy rem2 from stdin; +1 foo +2 bar +3 baz +\. +select * from rem2; + +delete from rem2; + -- Test check constraints alter table loc2 add constraint loc2_f1positive check (f1 >= 0); alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0); -- 2.51.2
