From 5e8545c8641d1afbee22b446bbfcae7970480fbf Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 10 Oct 2025 16:07:08 -0300
Subject: [PATCH v2] postgres_fdw: Use COPY to speed up batch inserts

---
 contrib/postgres_fdw/deparse.c      |  32 +++++++
 contrib/postgres_fdw/postgres_fdw.c | 140 ++++++++++++++++++++++++++++
 contrib/postgres_fdw/postgres_fdw.h |   1 +
 3 files changed, 173 insertions(+)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index e5b5e1a5f51..3323a92617a 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2238,6 +2238,38 @@ rebuildInsertSql(StringInfo buf, Relation rel,
 	appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+buildCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+	ListCell   *lc;
+	TupleDesc	tupdesc = RelationGetDescr(rel);
+	bool		first = true;
+
+	appendStringInfo(buf, "COPY ");
+	deparseRelation(buf, rel);
+	appendStringInfo(buf, "(");
+
+	foreach(lc, target_attrs)
+	{
+		int			attnum = lfirst_int(lc);
+		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+		if (attr->attgenerated)
+			continue;
+
+		if (!first)
+			appendStringInfoString(buf, ", ");
+
+		first = false;
+
+		appendStringInfoString(buf, quote_identifier(NameStr(attr->attname)));
+	}
+	appendStringInfoString(buf, ") FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..7f345d19102 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -192,6 +195,7 @@ typedef struct PgFdwModifyState
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
 	char	   *orig_query;		/* original text of INSERT command */
+	char	   *copy_query;		/* text of COPY command if it's being used */
 	List	   *target_attrs;	/* list of target attribute numbers */
 	int			values_end;		/* length up to the end of VALUES */
 	int			batch_size;		/* value of FDW option "batch_size" */
@@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_o,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+														  TupleTableSlot **slots,
+														  int *numSlots);
 
 
 /*
@@ -4066,6 +4073,50 @@ create_foreign_modify(EState *estate,
 	return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+						  PgFdwModifyState *fmstate,
+						  TupleTableSlot *slot)
+{
+	ListCell   *lc;
+	TupleDesc	tupdesc = RelationGetDescr(fmstate->rel);
+	bool		first = true;
+
+	foreach(lc, fmstate->target_attrs)
+	{
+		int			attnum = lfirst_int(lc);
+		CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
+		Datum		datum;
+		bool		isnull;
+
+		/* Ignore generated columns; they are set to DEFAULT */
+		if (attr->attgenerated)
+			continue;
+
+		if (!first)
+			appendStringInfoCharMacro(buf, '\t');
+		first = false;
+
+		datum = slot_getattr(slot, attnum, &isnull);
+
+		if (isnull)
+			appendStringInfoString(buf, "\\N");
+		else
+		{
+			const char *value = OutputFunctionCall(&fmstate->p_flinfo[attnum - 1],
+												   datum);
+
+			appendStringInfoString(buf, value);
+		}
+	}
+
+	appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
@@ -4097,6 +4148,13 @@ execute_foreign_modify(EState *estate,
 	if (fmstate->conn_state->pendingAreq)
 		process_pending_request(fmstate->conn_state->pendingAreq);
 
+	/*
+	 * Use COPY command for batch insert if the original query don't include a
+	 * RETURNING clause
+	 */
+	if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning)
+		return execute_foreign_insert_using_copy(fmstate, slots, numSlots);
+
 	/*
 	 * If the existing query was deparsed and prepared for a different number
 	 * of rows, rebuild it for the proper number.
@@ -7886,3 +7944,85 @@ get_batch_size_option(Relation rel)
 
 	return batch_size;
 }
+
+/*  Execute a batch insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+								  TupleTableSlot **slots,
+								  int *numSlots)
+{
+	PGresult   *res;
+	StringInfoData sql;
+	StringInfoData copy_data;
+	int			n_rows;
+	int			i;
+
+	if (fmstate->copy_query == NULL)
+	{
+		/* Build COPY command */
+		initStringInfo(&sql);
+		buildCopySql(&sql, fmstate->rel, fmstate->target_attrs);
+
+		/* Cache for reuse. */
+		fmstate->copy_query = sql.data;
+	}
+
+	/* Send COPY command */
+	if (!PQsendQuery(fmstate->conn, fmstate->copy_query))
+		pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+	/* get the COPY result */
+	res = pgfdw_get_result(fmstate->conn);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+	/* Convert the TupleTableSlot data into a TEXT-formatted line */
+	initStringInfo(&copy_data);
+	for (i = 0; i < *numSlots; i++)
+	{
+		convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+		/*
+		 * Send initial COPY data if the buffer reach the limit to avoid large
+		 * memory usage.
+		 */
+		if (copy_data.len >= COPYBUFSIZ)
+		{
+			if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+				pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+			resetStringInfo(&copy_data);
+		}
+	}
+
+	/* Send the remaining COPY data */
+	if (copy_data.len > 0)
+	{
+		if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+			pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+	}
+
+	/* End the COPY operation */
+	if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+		pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+	/*
+	 * Get the result, and check for success.
+	 */
+	res = pgfdw_get_result(fmstate->conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+	n_rows = atoi(PQcmdTuples(res));
+
+	/* And clean up */
+	PQclear(res);
+
+	MemoryContextReset(fmstate->temp_cxt);
+
+	*numSlots = n_rows;
+
+	/*
+	 * Return NULL if nothing was inserted on the remote end
+	 */
+	return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..c0198b865f3 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
 							 char *orig_query, List *target_attrs,
 							 int values_end_len, int num_params,
 							 int num_rows);
+extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
-- 
2.51.0

