Hi all, Currently on postgres_fdw we use prepared statements to insert batches into foreign tables. Although this works fine for the most use cases the COPY command can also be used in some scenarios to speed up large batch inserts.
The attached patch implements this idea of using the COPY command for batch inserts on postgres_fdw foreign tables. I've performed some benchmarks using pgbench and the results seem good to consider this. I've performed the benchmark using different batch_size values to see when this optimization could be useful. The following results are the best tps of 3 runs. Command: pgbench -n -c 10 -j 10 -t 100 -f bench.sql postgres batch_size: 10 master tps: 76.360406 patch tps: 68.917109 batch_size: 100 master tps: 123.427731 patch tps: 243.737055 batch_size: 1000 master tps: 132.500506 patch tps: 239.295132 It seems that using a batch_size greater than 100 we can have a considerable speed up for batch inserts. The attached patch uses the COPY command whenever we have a *numSlots > 1 but the tests show that maybe we should have a GUC to enable this? I also think that we can have a better patch by removing the duplicated code introduced on this first version, specially on the clean up phase, but I tried to keep things more simple on this initial phase to keep the review more easier and also just to test the idea. Lastly, I don't know if we should change the EXPLAIN(ANALYZE, VERBOSE) output for batch inserts that use the COPY to mention that we are sending the COPY command to the remote server. I guess so? (this proposal is based on a patch idea written by Tomas Vondra in one of his blogs posts) -- Matheus Alcantara
From d4041814ba377475a1fa36b6972d5cf5f989a38f Mon Sep 17 00:00:00 2001 From: Matheus Alcantara <[email protected]> Date: Fri, 10 Oct 2025 16:07:08 -0300 Subject: [PATCH v1] postgres_fdw: Use COPY to speed up batch inserts --- contrib/postgres_fdw/deparse.c | 32 +++++++++ contrib/postgres_fdw/postgres_fdw.c | 108 ++++++++++++++++++++++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 141 insertions(+) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index e5b5e1a5f51..cd80bb7306a 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -2238,6 +2238,38 @@ rebuildInsertSql(StringInfo buf, Relation rel, appendStringInfoString(buf, orig_query + values_end_len); } +/* + * Build a COPY FROM STDIN statement using the TEXT format + */ +void +buildCopySql(StringInfo buf, Relation rel, List *target_attrs) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(rel); + bool first = true; + + appendStringInfo(buf, "COPY "); + deparseRelation(buf, rel); + appendStringInfo(buf, "("); + + foreach(lc, target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + + first = false; + + appendStringInfoString(buf, quote_identifier(NameStr(attr->attname))); + } + appendStringInfoString(buf, ") FROM STDIN (FORMAT TEXT, DELIMITER ',')"); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 456b267f70b..985c9bc5be7 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -4066,6 +4066,50 @@ create_foreign_modify(EState *estate, return fmstate; } +/* + * Write target attribute values from fmstate into buf buffer to be sent as + * COPY FROM STDIN data + */ +static void +convert_slot_to_copy_text(StringInfo buf, + PgFdwModifyState *fmstate, + TupleTableSlot *slot) +{ + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + bool first = true; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1); + Datum datum; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + + if (!first) + appendStringInfoString(buf, ","); + first = false; + + datum = slot_getattr(slot, attnum, &isnull); + + if (isnull) + appendStringInfoString(buf, "\\N"); + else + { + const char *value = OutputFunctionCall(&fmstate->p_flinfo[attnum - 1], + datum); + + appendStringInfoString(buf, value); + } + } + + appendStringInfoChar(buf, '\n'); +} + /* * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING @@ -4097,6 +4141,70 @@ execute_foreign_modify(EState *estate, if (fmstate->conn_state->pendingAreq) process_pending_request(fmstate->conn_state->pendingAreq); + /* + * Use COPY command for batch insert if the original query don't include a + * RETURNING clause + */ + if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning) + { + int i; + StringInfoData copy_data; + + /* Build COPY command */ + initStringInfo(&sql); + buildCopySql(&sql, fmstate->rel, fmstate->target_attrs); + + /* Send COPY command */ + if (!PQsendQuery(fmstate->conn, sql.data)) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* get the COPY result */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(res, fmstate->conn, sql.data); + + /* Convert the TupleTableSlot data into a TEXT-formatted line */ + initStringInfo(©_data); + for (i = 0; i < *numSlots; i++) + { + /* + * XXX(matheus): Should we have a COPYBUFSIZ limit to send large + * data in batches instead of grow the buffer too much? + */ + convert_slot_to_copy_text(©_data, fmstate, slots[i]); + } + + /* Send COPY data */ + if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* End the COPY operation */ + if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn)) + pgfdw_report_error(NULL, fmstate->conn, sql.data); + + /* + * Get the result, and check for success. + */ + res = pgfdw_get_result(fmstate->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(res, fmstate->conn, sql.data); + + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted/updated/deleted on the remote + * end + */ + return (n_rows > 0) ? slots : NULL; + } + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index e69735298d7..c0198b865f3 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows); +extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, -- 2.51.0
bench.sql
Description: application/sql
